Author: xuefu
Date: Fri Feb 26 14:37:50 2016
New Revision: 1732492
URL: http://svn.apache.org/viewvc?rev=1732492&view=rev
Log:
PIG-4781: Fix remaining unit failure about TestCollectedGroup for spark engine
(Liyun via Xuefu)
Modified:
pig/branches/spark/test/org/apache/pig/test/TestCollectedGroup.java
Modified: pig/branches/spark/test/org/apache/pig/test/TestCollectedGroup.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestCollectedGroup.java?rev=1732492&r1=1732491&r2=1732492&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestCollectedGroup.java
(original)
+++ pig/branches/spark/test/org/apache/pig/test/TestCollectedGroup.java Fri Feb
26 14:37:50 2016
@@ -294,38 +294,40 @@ public class TestCollectedGroup {
@Test
public void testMapsideGroupWithMergeJoin() throws IOException{
- pigServer = new PigServer(cluster.getExecType(),
cluster.getProperties());
- pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' using
"+DummyCollectableLoader.class.getName() +"() as (id, name, grade);");
- pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' using
"+DummyCollectableLoader.class.getName() +"() as (id, name, grade);");
- try {
- DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
- DataBag dbshj = BagFactory.getInstance().newDefaultBag();
- {
- pigServer.registerQuery("C = join A by id, B by id using
'merge';");
- pigServer.registerQuery("D = group C by A::id using
'collected';");
- pigServer.registerQuery("E = foreach D generate group,
COUNT(C);");
- Iterator<Tuple> iter = pigServer.openIterator("E");
+ if( !Util.isSparkExecType(cluster.getExecType())) {
+ pigServer = new PigServer(cluster.getExecType(),
cluster.getProperties());
+ pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' using " +
DummyCollectableLoader.class.getName() + "() as (id, name, grade);");
+ pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' using " +
DummyCollectableLoader.class.getName() + "() as (id, name, grade);");
+ try {
+ DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
+ DataBag dbshj = BagFactory.getInstance().newDefaultBag();
+ {
+ pigServer.registerQuery("C = join A by id, B by id using
'merge';");
+ pigServer.registerQuery("D = group C by A::id using
'collected';");
+ pigServer.registerQuery("E = foreach D generate group,
COUNT(C);");
+ Iterator<Tuple> iter = pigServer.openIterator("E");
- while (iter.hasNext()) {
- dbfrj.add(iter.next());
+ while (iter.hasNext()) {
+ dbfrj.add(iter.next());
+ }
}
- }
- {
- pigServer.registerQuery("F = join A by id, B by id;");
- pigServer.registerQuery("G = group F by A::id;");
- pigServer.registerQuery("H = foreach G generate group,
COUNT(F);");
- Iterator<Tuple> iter = pigServer.openIterator("H");
+ {
+ pigServer.registerQuery("F = join A by id, B by id;");
+ pigServer.registerQuery("G = group F by A::id;");
+ pigServer.registerQuery("H = foreach G generate group,
COUNT(F);");
+ Iterator<Tuple> iter = pigServer.openIterator("H");
- while (iter.hasNext()) {
- dbshj.add(iter.next());
+ while (iter.hasNext()) {
+ dbshj.add(iter.next());
+ }
}
- }
- Assert.assertTrue(dbfrj.size()>0 && dbshj.size()>0);
- Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
+ Assert.assertTrue(dbfrj.size() > 0 && dbshj.size() > 0);
+ Assert.assertEquals(true, TestHelper.compareBags(dbfrj,
dbshj));
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail(e.getMessage());
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail(e.getMessage());
+ }
}
}