Author: xuefu
Date: Thu Feb 18 14:01:14 2016
New Revision: 1731076

URL: http://svn.apache.org/viewvc?rev=1731076&view=rev
Log:
Revert PIG-4601: Implement Merge CoGroup for Spark engine (Liyun via Xuefu)

Modified:
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
    pig/branches/spark/test/org/apache/pig/test/TestMapSideCogroup.java

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1731076&r1=1731075&r2=1731076&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
 Thu Feb 18 14:01:14 2016
@@ -55,7 +55,6 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
-import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
@@ -77,7 +76,6 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.LimitConverter;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.LoadConverter;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.LocalRearrangeConverter;
-import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.MergeCogroupConverter;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.MergeJoinConverter;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.PackageConverter;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.converter.RDDConverter;
@@ -204,7 +202,6 @@ public class SparkLauncher extends Launc
         convertMap.put(PORank.class, new RankConverter());
         convertMap.put(POStream.class, new StreamConverter(confBytes));
         convertMap.put(POFRJoin.class, new FRJoinConverter());
-        convertMap.put(POMergeCogroup.class, new MergeCogroupConverter());
         convertMap.put(POReduceBySpark.class, new ReduceByConverter());
         convertMap.put(POPreCombinerLocalRearrange.class, new 
LocalRearrangeConverter());
 

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java?rev=1731076&r1=1731075&r2=1731076&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkUtil.java
 Thu Feb 18 14:01:14 2016
@@ -18,38 +18,29 @@
 package org.apache.pig.backend.hadoop.executionengine.spark;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
-
-import scala.Product2;
-import scala.Tuple2;
-import scala.collection.JavaConversions;
-import scala.collection.Seq;
-import scala.reflect.ClassTag;
-import scala.reflect.ClassTag$;
+import java.util.UUID;
 
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
-import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
-import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
-import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.plan.NodeIdGenerator;
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.spark.HashPartitioner;
 import org.apache.spark.Partitioner;
 import org.apache.spark.rdd.RDD;
 
+import scala.Product2;
+import scala.Tuple2;
+import scala.collection.JavaConversions;
+import scala.collection.Seq;
+import scala.reflect.ClassTag;
+import scala.reflect.ClassTag$;
+
 public class SparkUtil {
 
     public static <T> ClassTag<T> getManifest(Class<T> clazz) {
@@ -128,25 +119,4 @@ public class SparkUtil {
             return new MapReducePartitionerWrapper(customPartitioner, 
parallelism);
         }
     }
-
-    // createIndexerSparkNode is a utility to create an indexer spark node 
with baseSparkOp
-    static public void createIndexerSparkNode(SparkOperator baseSparkOp, 
String scope, NodeIdGenerator nig) throws PlanException, ExecException {
-        List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
-        PhysicalPlan ep = new PhysicalPlan();
-        POProject prj = new POProject(new OperatorKey(scope,
-                nig.getNextNodeId(scope)));
-        prj.setStar(true);
-        prj.setOverloaded(false);
-        prj.setResultType(DataType.TUPLE);
-        ep.add(prj);
-        eps.add(ep);
-
-        List<Boolean> ascCol = new ArrayList<Boolean>();
-        ascCol.add(true);
-
-        int requestedParallelism = baseSparkOp.requestedParallelism;
-        POSort sort = new POSort(new OperatorKey(scope, 
nig.getNextNodeId(scope)), requestedParallelism, null, eps, ascCol, null);
-        //POSort is added to sort the index tuples genereated by 
MergeJoinIndexer.More detail, see PIG-4601
-        baseSparkOp.physicalPlan.addAsLeaf(sort);
-    }
-}
\ No newline at end of file
+}

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java?rev=1731076&r1=1731075&r2=1731076&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/plan/SparkCompiler.java
 Thu Feb 18 14:01:14 2016
@@ -18,7 +18,6 @@
 package org.apache.pig.backend.hadoop.executionengine.spark.plan;
 
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -31,16 +30,10 @@ import java.util.Set;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.CollectableLoadFunc;
-import org.apache.pig.FuncSpec;
-import org.apache.pig.IndexableLoadFunc;
 import org.apache.pig.LoadFunc;
-import org.apache.pig.OrderedLoadFunc;
 import org.apache.pig.PigException;
-import org.apache.pig.backend.executionengine.ExecException;
-import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.ScalarPhyFinder;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.UDFFinder;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -54,7 +47,6 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
-import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
@@ -68,11 +60,11 @@ import org.apache.pig.backend.hadoop.exe
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
-import org.apache.pig.backend.hadoop.executionengine.spark.SparkUtil;
+import org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
 import 
org.apache.pig.backend.hadoop.executionengine.spark.operator.POGlobalRearrangeSpark;
+import org.apache.pig.builtin.LOG;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -81,8 +73,6 @@ import org.apache.pig.impl.plan.Operator
 import org.apache.pig.impl.plan.OperatorPlan;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.util.ObjectSerializer;
-import org.apache.pig.impl.util.Utils;
 
 /**
  * The compiler that compiles a given physical physicalPlan into a DAG of Spark
@@ -811,173 +801,4 @@ public class SparkCompiler extends PhyPl
                        finPlan.merge(e);
                }
        }
-
-    @Override
-    public void visitMergeCoGroup(POMergeCogroup poCoGrp) throws 
VisitorException {
-        if (compiledInputs.length < 2) {
-            int errCode = 2251;
-            String errMsg = "Merge Cogroup work on two or more relations." +
-                    "To use map-side group-by on single relation, use 
'collected' qualifier.";
-            throw new SparkCompilerException(errMsg, errCode);
-        }
-
-        List<FuncSpec> funcSpecs = new 
ArrayList<FuncSpec>(compiledInputs.length - 1);
-        List<String> fileSpecs = new ArrayList<String>(compiledInputs.length - 
1);
-        List<String> loaderSigns = new ArrayList<String>(compiledInputs.length 
- 1);
-
-        try {
-            poCoGrp.setEndOfRecordMark(POStatus.STATUS_NULL);
-
-            // Iterate through all the SparkOpererators, disconnect side 
SparkOperators from
-            // SparkOperator and collect all the information needed in 
different lists.
-
-            for (int i = 0; i < compiledInputs.length; i++) {
-                SparkOperator sparkOper = compiledInputs[i];
-                PhysicalPlan plan = sparkOper.physicalPlan;
-                if (plan.getRoots().size() != 1) {
-                    int errCode = 2171;
-                    String errMsg = "Expected one but found more then one root 
physical operator in physical plan.";
-                    throw new SparkCompilerException(errMsg, errCode, 
PigException.BUG);
-                }
-
-                PhysicalOperator rootPOOp = plan.getRoots().get(0);
-                if (!(rootPOOp instanceof POLoad)) {
-                    int errCode = 2172;
-                    String errMsg = "Expected physical operator at root to be 
POLoad. Found : " + rootPOOp.getClass().getCanonicalName();
-                    throw new SparkCompilerException(errMsg, errCode);
-                }
-
-                POLoad sideLoader = (POLoad) rootPOOp;
-                FileSpec loadFileSpec = sideLoader.getLFile();
-                FuncSpec funcSpec = loadFileSpec.getFuncSpec();
-                LoadFunc loadfunc = sideLoader.getLoadFunc();
-                if (i == 0) {
-
-                    if 
(!(CollectableLoadFunc.class.isAssignableFrom(loadfunc.getClass()))) {
-                        int errCode = 2252;
-                        throw new SparkCompilerException("Base loader in 
Cogroup must implement CollectableLoadFunc.", errCode);
-                    }
-
-                    ((CollectableLoadFunc) 
loadfunc).ensureAllKeyInstancesInSameSplit();
-                    continue;
-                }
-                if 
(!(IndexableLoadFunc.class.isAssignableFrom(loadfunc.getClass()))) {
-                    int errCode = 2253;
-                    throw new SparkCompilerException("Side loaders in cogroup 
must implement IndexableLoadFunc.", errCode);
-                }
-
-                funcSpecs.add(funcSpec);
-                fileSpecs.add(loadFileSpec.getFileName());
-                loaderSigns.add(sideLoader.getSignature());
-                sparkPlan.remove(sparkOper);
-            }
-
-            poCoGrp.setSideLoadFuncs(funcSpecs);
-            poCoGrp.setSideFileSpecs(fileSpecs);
-            poCoGrp.setLoaderSignatures(loaderSigns);
-
-            // Use spark operator of base relation for the cogroup operation.
-            SparkOperator baseSparkOp = 
phyToSparkOpMap.get(poCoGrp.getInputs().get(0));
-
-            // Create a spark operator to generate index file for tuples from 
leftmost relation
-            SparkOperator indexerSparkOp = getSparkOp();
-            FileSpec idxFileSpec = getIndexingJob(indexerSparkOp, baseSparkOp, 
poCoGrp.getLRInnerPlansOf(0));
-            poCoGrp.setIdxFuncSpec(idxFileSpec.getFuncSpec());
-            poCoGrp.setIndexFileName(idxFileSpec.getFileName());
-
-            baseSparkOp.physicalPlan.addAsLeaf(poCoGrp);
-            for (FuncSpec funcSpec : funcSpecs)
-                baseSparkOp.UDFs.add(funcSpec.toString());
-
-            sparkPlan.add(indexerSparkOp);
-            sparkPlan.connect(indexerSparkOp, baseSparkOp);
-            phyToSparkOpMap.put(poCoGrp, baseSparkOp);
-            curSparkOp = baseSparkOp;
-        } catch (ExecException e) {
-            throw new SparkCompilerException(e.getDetailedMessage(), 
e.getErrorCode(), e.getErrorSource(), e);
-        } catch (SparkCompilerException mrce) {
-            throw (mrce);
-        } catch (CloneNotSupportedException e) {
-            throw new SparkCompilerException(e);
-        } catch (PlanException e) {
-            int errCode = 2034;
-            String msg = "Error compiling operator " + 
poCoGrp.getClass().getCanonicalName();
-            throw new SparkCompilerException(msg, errCode, PigException.BUG, 
e);
-        } catch (IOException e) {
-            int errCode = 3000;
-            String errMsg = "IOException caught while compiling 
POMergeCoGroup";
-            throw new SparkCompilerException(errMsg, errCode, e);
-        }
-    }
-
-    // Sets up the indexing job for single-stage cogroups.
-    private FileSpec getIndexingJob(SparkOperator indexerSparkOp,
-                                    final SparkOperator baseSparkOp, final 
List<PhysicalPlan> mapperLRInnerPlans)
-            throws SparkCompilerException, PlanException, ExecException, 
IOException, CloneNotSupportedException {
-
-        // First replace loader with  MergeJoinIndexer.
-        PhysicalPlan baseMapPlan = baseSparkOp.physicalPlan;
-        POLoad baseLoader = (POLoad) baseMapPlan.getRoots().get(0);
-        FileSpec origLoaderFileSpec = baseLoader.getLFile();
-        FuncSpec funcSpec = origLoaderFileSpec.getFuncSpec();
-        LoadFunc loadFunc = baseLoader.getLoadFunc();
-
-        if (!(OrderedLoadFunc.class.isAssignableFrom(loadFunc.getClass()))) {
-            int errCode = 1104;
-            String errMsg = "Base relation of merge-coGroup must implement " +
-                    "OrderedLoadFunc interface. The specified loader "
-                    + funcSpec + " doesn't implement it";
-            throw new SparkCompilerException(errMsg, errCode);
-        }
-
-        String[] indexerArgs = new String[6];
-        indexerArgs[0] = funcSpec.toString();
-        indexerArgs[1] = ObjectSerializer.serialize((Serializable) 
mapperLRInnerPlans);
-        indexerArgs[3] = baseLoader.getSignature();
-        indexerArgs[4] = baseLoader.getOperatorKey().scope;
-        indexerArgs[5] = Boolean.toString(false); // we care for nulls.
-
-        PhysicalPlan phyPlan;
-        if (baseMapPlan.getSuccessors(baseLoader) == null
-                || baseMapPlan.getSuccessors(baseLoader).isEmpty()) {
-            // Load-Load-Cogroup case.
-            phyPlan = null;
-        } else { // We got something. Yank it and set it as inner plan.
-            phyPlan = baseMapPlan.clone();
-            PhysicalOperator root = phyPlan.getRoots().get(0);
-            phyPlan.disconnect(root, phyPlan.getSuccessors(root).get(0));
-            phyPlan.remove(root);
-
-        }
-        indexerArgs[2] = ObjectSerializer.serialize(phyPlan);
-
-        POLoad idxJobLoader = getLoad();
-        idxJobLoader.setLFile(new FileSpec(origLoaderFileSpec.getFileName(),
-                new FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs)));
-        indexerSparkOp.physicalPlan.add(idxJobLoader);
-        
indexerSparkOp.UDFs.add(baseLoader.getLFile().getFuncSpec().toString());
-
-        // Loader of sparkOp will return a tuple of form -
-        // (key1, key2, .. , WritableComparable, splitIndex). See 
MergeJoinIndexer for details.
-        // Create a spark node to retrieve index file by MergeJoinIndexer
-        SparkUtil.createIndexerSparkNode(indexerSparkOp, scope, nig);
-
-        POStore st = getStore();
-        FileSpec strFile = getTempFileSpec();
-        st.setSFile(strFile);
-        indexerSparkOp.physicalPlan.addAsLeaf(st);
-
-        return strFile;
-    }
-
-    /**
-     * Returns a temporary DFS Path
-     *
-     * @return
-     * @throws IOException
-     */
-    private FileSpec getTempFileSpec() throws IOException {
-        return new 
FileSpec(FileLocalizer.getTemporaryPath(pigContext).toString(),
-                new FuncSpec(Utils.getTmpFileCompressorName(pigContext)));
-    }
 }

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java?rev=1731076&r1=1731075&r2=1731076&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/running/PigInputFormatSpark.java
 Thu Feb 18 14:01:14 2016
@@ -22,17 +22,13 @@ import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
-import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.tools.pigstats.PigStatusReporter;
 
@@ -44,18 +40,12 @@ public class PigInputFormatSpark extends
                        InterruptedException {
         init();
         resetUDFContext();
+        RecordReader recordReader = super.createRecordReader(split, context);
         //PigSplit#conf is the default hadoop configuration, we need get the 
configuration
         //from context.getConfigration() to retrieve pig properties
         PigSplit pigSplit = (PigSplit) split;
-        Configuration conf = context.getConfiguration();
-        pigSplit.setConf(conf);
-        //Set current splitIndex in 
PigMapReduce.sJobContext.getConfiguration.get(PigImplConstants.PIG_SPLIT_INDEX)
-        //which will be used in POMergeCogroup#setup
-        if (PigMapReduce.sJobContext == null) {
-            PigMapReduce.sJobContext = HadoopShims.createJobContext(conf, new 
JobID());
-        }
-        
PigMapReduce.sJobContext.getConfiguration().setInt(PigImplConstants.PIG_SPLIT_INDEX,
 pigSplit.getSplitIndex());
-        return super.createRecordReader(split, context);
+        pigSplit.setConf(context.getConfiguration());
+        return recordReader;
     }
 
        private void resetUDFContext() {

Modified: pig/branches/spark/test/org/apache/pig/test/TestMapSideCogroup.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestMapSideCogroup.java?rev=1731076&r1=1731075&r2=1731076&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestMapSideCogroup.java 
(original)
+++ pig/branches/spark/test/org/apache/pig/test/TestMapSideCogroup.java Thu Feb 
18 14:01:14 2016
@@ -311,11 +311,12 @@ public class TestMapSideCogroup {
         pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "," + INPUT_FILE4 
+ "' using "+ DummyCollectableLoader.class.getName() +"() as 
(c1:chararray,c2:int);");
         pigServer.registerQuery("B = LOAD '" + INPUT_FILE5 + "' using "+ 
DummyIndexableLoader.class.getName()   +"() as (c1:chararray,c2:int);");
 
-        List<Tuple> dbMergeCogrp = new ArrayList<Tuple>();
+        DataBag dbMergeCogrp = BagFactory.getInstance().newDefaultBag();
 
         pigServer.registerQuery("C = cogroup A by c1, B by c1 using 'merge';");
         Iterator<Tuple> iter = pigServer.openIterator("C");
 
+
         while(iter.hasNext()) {
             Tuple t = iter.next();
             dbMergeCogrp.add(t);
@@ -334,29 +335,12 @@ public class TestMapSideCogroup {
                 "(3,{(3,3),(3,2),(3,1)},{(3,1),(3,2),(3,3)})"
         };
 
-        List<Tuple> expected = Util.getTuplesFromConstantTupleStrings(results);
-
-        //We need sort dbMergeCogrp because the result is different in 
sequence between spark and other mode when
-        //multiple files are loaded(LOAD INPUT_FILE1,INPUT_FILE4...)
-        for (Tuple t : dbMergeCogrp) {
-            Util.convertBagToSortedBag(t);
-        }
-        for (Tuple t : expected) {
-            Util.convertBagToSortedBag(t);
-        }
-
-        Collections.sort(dbMergeCogrp);
-        Collections.sort(expected);
-        assertEquals(dbMergeCogrp.size(), expected.size());
-
-        //Since TestMapSideCogroup.DummyIndexableLoader.getNext() does not
-        //apply schema for each input 
tuple,Util#checkQueryOutputsAfterSortRecursive fails to assert.
-        // The schema for C is 
(int,{(chararray,int),(chararray,int),(chararray,int)},{(chararray,int),(chararray,int),(chararray,int)}).
-        //But the schema for result "dbMergeCogrp" is 
(int,{(chararray,int),(chararray,int),(chararray,int)},{(chararray,chararray),(chararray,chararray),(chararray,chararray)})
+        assertEquals(9, dbMergeCogrp.size());
         Iterator<Tuple> itr = dbMergeCogrp.iterator();
-        for (int i = 0; i < dbMergeCogrp.size(); i++) {
-            assertEquals(itr.next().toString(), expected.get(i).toString());
+        for(int i=0; i<9; i++){
+            assertEquals(itr.next().toString(), results[i]);   
         }
+        assertFalse(itr.hasNext());
     }
 
     @Test


Reply via email to