Author: rohini
Date: Thu Jan 19 13:45:13 2017
New Revision: 1779463

URL: http://svn.apache.org/viewvc?rev=1779463&view=rev
Log:
PIG-5046: Skewed join with auto parallelism hangs when right input also has 
autoparallelism (rohini)

Modified:
    pig/trunk/CHANGES.txt
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java
    pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1779463&r1=1779462&r2=1779463&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Jan 19 13:45:13 2017
@@ -187,6 +187,8 @@ OPTIMIZATIONS
 
 BUG FIXES
 
+PIG-5046: Skewed join with auto parallelism hangs when right input also has 
autoparallelism (rohini)
+
 PIG-5108: AvroStorage on Tez with exception on nested records (daijy)
 
 PIG-4260: SpillableMemoryManager.spill should revert spill on all exception 
(rohini)

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java?rev=1779463&r1=1779462&r2=1779463&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java
 Thu Jan 19 13:45:13 2017
@@ -17,23 +17,25 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.tez.runtime;
 
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.VertexManagerPlugin;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
 import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
-import org.apache.tez.dag.app.dag.impl.ScatterGatherEdgeManager;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 /**
@@ -46,8 +48,13 @@ import com.google.common.collect.Lists;
 public class PartitionerDefinedVertexManager extends VertexManagerPlugin {
     private static final Log LOG = 
LogFactory.getLog(PartitionerDefinedVertexManager.class);
 
-    private boolean isParallelismSet = false;
+    private volatile boolean parallelismSet;
     private int dynamicParallelism = -1;
+    private int numConfiguredSources;
+    private int numSources = -1;
+    private volatile boolean configured;
+    private volatile boolean started;
+    private volatile boolean scheduled;
 
     public PartitionerDefinedVertexManager(VertexManagerPluginContext context) 
{
         super(context);
@@ -55,7 +62,31 @@ public class PartitionerDefinedVertexMan
 
     @Override
     public void initialize() {
-        // Nothing to do
+        // this will prevent vertex from starting until we notify we are done
+        getContext().vertexReconfigurationPlanned();
+        parallelismSet = false;
+        numConfiguredSources = 0;
+        configured = false;
+        started = false;
+        numSources = getContext().getInputVertexEdgeProperties().size();
+        // wait for sources and self to start
+        Map<String, EdgeProperty> edges = 
getContext().getInputVertexEdgeProperties();
+        for (String entry : edges.keySet()) {
+            getContext().registerForVertexStateUpdates(entry, 
EnumSet.of(VertexState.CONFIGURED));
+        }
+    }
+
+    @Override
+    public synchronized void onVertexStateUpdated(VertexStateUpdate 
stateUpdate)
+            throws Exception {
+        numConfiguredSources++;
+        LOG.info("For vertex: " + getContext().getVertexName() + " Received 
configured signal from: "
+            + stateUpdate.getVertexName() + " numConfiguredSources: " + 
numConfiguredSources
+            + " needed: " + numSources);
+        Preconditions.checkState(numConfiguredSources <= numSources, "Vertex: 
" + getContext().getVertexName());
+        if (numConfiguredSources == numSources) {
+            configure();
+        }
     }
 
     @Override
@@ -73,10 +104,9 @@ public class PartitionerDefinedVertexMan
     public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) 
throws Exception {
         // There could be multiple partition vertex sending VertexManagerEvent
         // Only need to setVertexParallelism once
-        if (isParallelismSet) {
+        if (parallelismSet) {
             return;
         }
-        isParallelismSet = true;
         // Need to distinguish from VertexManagerEventPayloadProto emitted by 
OrderedPartitionedKVOutput
         if (vmEvent.getUserPayload().limit()==4) {
             dynamicParallelism = vmEvent.getUserPayload().getInt();
@@ -96,18 +126,50 @@ public class PartitionerDefinedVertexMan
                     edgeManagers.put(entry.getKey(), edge);
                 }
                 getContext().reconfigureVertex(dynamicParallelism, null, 
edgeManagers);
+                parallelismSet = true;
+                configure();
             }
         }
     }
 
-    @Override
-    public void onVertexStarted(Map<String, List<Integer>> completions) {
-        if (dynamicParallelism != -1) {
+    private void configure() {
+        if(parallelismSet && (numSources == numConfiguredSources)) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Done reconfiguring vertex " + 
getContext().getVertexName());
+            }
+            getContext().doneReconfiguringVertex();
+            configured = true;
+            trySchedulingTasks();
+        }
+    }
+
+    private synchronized void trySchedulingTasks() {
+        if (configured && started && !scheduled) {
+            LOG.info("Scheduling " + dynamicParallelism + " tasks for vertex " 
+ getContext().getVertexName());
             List<TaskWithLocationHint> tasksToStart = 
Lists.newArrayListWithCapacity(dynamicParallelism);
-            for (int i=0; i<dynamicParallelism; ++i) {
+            for (int i = 0; i < dynamicParallelism; ++i) {
                 tasksToStart.add(new TaskWithLocationHint(new Integer(i), 
null));
             }
             getContext().scheduleVertexTasks(tasksToStart);
+            scheduled = true;
         }
     }
+
+    @Override
+    public void onVertexStarted(Map<String, List<Integer>> completions) {
+        // This vertex manager will be getting the following calls
+        //   1) onVertexManagerEventReceived - Parallelism vertex manager 
event sent by sample aggregator vertex
+        //   2) onVertexStateUpdated - Vertex CONFIGURED status updates from
+        //       - Order by Partitioner vertex (1-1) in case of Order by
+        //       - Skewed Join Left Partitioner (1-1) and Right Input Vertices 
in case of SkewedJoin
+        //   3) onVertexStarted
+        // Calls 2) and 3) can happen in any order. So we should schedule tasks
+        // only after start is called and configuration is also complete
+        started = true;
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Vertex start received for " + 
getContext().getVertexName());
+        }
+        trySchedulingTasks();
+    }
+
 }

Modified: pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java?rev=1779463&r1=1779462&r2=1779463&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java (original)
+++ pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java Thu Jan 19 
13:45:13 2017
@@ -36,6 +36,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigServer;
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
@@ -47,6 +48,7 @@ import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.test.MiniGenericCluster;
 import org.apache.pig.test.Util;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -62,12 +64,23 @@ public class TestTezAutoParallelism {
     private static Properties properties;
     private static MiniGenericCluster cluster;
 
+    private static final PathFilter PART_FILE_FILTER = new PathFilter() {
+        @Override
+        public boolean accept(Path path) {
+            if (path.getName().startsWith("part")) {
+                return true;
+            }
+            return false;
+        }
+    };
+
     @BeforeClass
     public static void oneTimeSetUp() throws Exception {
         cluster = 
MiniGenericCluster.buildCluster(MiniGenericCluster.EXECTYPE_TEZ);
         properties = cluster.getProperties();
         //Test spilling to disk as tests here have multiple splits
         
properties.setProperty(PigConfiguration.PIG_TEZ_INPUT_SPLITS_MEM_THRESHOLD, 
"10");
+        properties.setProperty(PigConfiguration.PIG_TEZ_GRACE_PARALLELISM, 
"false");
         createFiles();
     }
 
@@ -84,6 +97,11 @@ public class TestTezAutoParallelism {
 
     @After
     public void tearDown() throws Exception {
+        removeProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION);
+        removeProperty(MRConfiguration.MAX_SPLIT_SIZE);
+        removeProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM);
+        removeProperty(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART);
+        removeProperty(TezConfiguration.TEZ_AM_LOG_LEVEL);
         pigServer.shutdown();
         pigServer = null;
     }
@@ -131,23 +149,15 @@ public class TestTezAutoParallelism {
     @Test
     public void testGroupBy() throws IOException{
         // parallelism is 3 originally, reduce to 1
-        
pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION,
 "true");
-        
pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE,
 "3000");
-        
pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+        setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+        setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+        setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
                 
Long.toString(InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
         pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as 
(name:chararray, age:int);");
         pigServer.registerQuery("B = group A by name;");
         pigServer.store("B", "output1");
         FileSystem fs = cluster.getFileSystem();
-        FileStatus[] files = fs.listStatus(new Path("output1"), new 
PathFilter(){
-            @Override
-            public boolean accept(Path path) {
-                if (path.getName().startsWith("part")) {
-                    return true;
-                }
-                return false;
-            }
-        });
+        FileStatus[] files = fs.listStatus(new Path("output1"), 
PART_FILE_FILTER);
         assertEquals(files.length, 1);
         fs.delete(new Path("output1"), true);
     }
@@ -158,9 +168,9 @@ public class TestTezAutoParallelism {
         NodeIdGenerator.reset();
         PigServer.resetScope();
 
-        
pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION,
 "true");
-        
pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE,
 "3000");
-        
pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
 "1000");
+        setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+        setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+        setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "1000");
 
         StringWriter writer = new StringWriter();
         Util.createLogAppender("testAutoParallelism", writer, 
TezDagBuilder.class);
@@ -169,15 +179,7 @@ public class TestTezAutoParallelism {
             pigServer.registerQuery("B = group A by name;");
             pigServer.store("B", "output1");
             FileSystem fs = cluster.getFileSystem();
-            FileStatus[] files = fs.listStatus(new Path("output1"), new 
PathFilter(){
-                @Override
-                public boolean accept(Path path) {
-                    if (path.getName().startsWith("part")) {
-                        return true;
-                    }
-                    return false;
-                }
-            });
+            FileStatus[] files = fs.listStatus(new Path("output1"), 
PART_FILE_FILTER);
             assertEquals(files.length, 10);
             String log = writer.toString();
             assertTrue(log.contains("For vertex - scope-13: parallelism=3"));
@@ -191,9 +193,9 @@ public class TestTezAutoParallelism {
     @Test
     public void testOrderbyDecreaseParallelism() throws IOException{
         // order by parallelism is 3 originally, reduce to 1
-        
pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION,
 "true");
-        
pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE,
 "3000");
-        
pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+        setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+        setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+        setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
                 
Long.toString(InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
         pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as 
(name:chararray, age:int);");
         pigServer.registerQuery("B = group A by name parallel 3;");
@@ -201,86 +203,54 @@ public class TestTezAutoParallelism {
         pigServer.registerQuery("D = order C by age;");
         pigServer.store("D", "output2");
         FileSystem fs = cluster.getFileSystem();
-        FileStatus[] files = fs.listStatus(new Path("output2"), new 
PathFilter(){
-            @Override
-            public boolean accept(Path path) {
-                if (path.getName().startsWith("part")) {
-                    return true;
-                }
-                return false;
-            }
-        });
+        FileStatus[] files = fs.listStatus(new Path("output2"), 
PART_FILE_FILTER);
         assertEquals(files.length, 1);
     }
 
     @Test
     public void testOrderbyIncreaseParallelism() throws IOException{
         // order by parallelism is 3 originally, increase to 4
-        
pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION,
 "true");
-        
pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE,
 "3000");
-        
pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
 "1000");
+        setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+        setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+        setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "1000");
         pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as 
(name:chararray, age:int);");
         pigServer.registerQuery("B = group A by name parallel 3;");
         pigServer.registerQuery("C = foreach B generate group as name, 
AVG(A.age) as age;");
         pigServer.registerQuery("D = order C by age;");
         pigServer.store("D", "output3");
         FileSystem fs = cluster.getFileSystem();
-        FileStatus[] files = fs.listStatus(new Path("output3"), new 
PathFilter(){
-            @Override
-            public boolean accept(Path path) {
-                if (path.getName().startsWith("part")) {
-                    return true;
-                }
-                return false;
-            }
-        });
+        FileStatus[] files = fs.listStatus(new Path("output3"), 
PART_FILE_FILTER);
         assertEquals(files.length, 4);
     }
 
     @Test
     public void testSkewedJoinDecreaseParallelism() throws IOException{
         // skewed join parallelism is 4 originally, reduce to 1
-        
pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION,
 "true");
-        
pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE,
 "3000");
-        
pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+        setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+        setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+        setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
                 
Long.toString(InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
         pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as 
(name:chararray, age:int);");
         pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as 
(name:chararray, gender:chararray);");
         pigServer.registerQuery("C = join A by name, B by name using 
'skewed';");
         pigServer.store("C", "output4");
         FileSystem fs = cluster.getFileSystem();
-        FileStatus[] files = fs.listStatus(new Path("output4"), new 
PathFilter(){
-            @Override
-            public boolean accept(Path path) {
-                if (path.getName().startsWith("part")) {
-                    return true;
-                }
-                return false;
-            }
-        });
+        FileStatus[] files = fs.listStatus(new Path("output4"), 
PART_FILE_FILTER);
         assertEquals(files.length, 1);
     }
 
     @Test
     public void testSkewedJoinIncreaseParallelism() throws IOException{
         // skewed join parallelism is 3 originally, increase to 5
-        
pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION,
 "true");
-        
pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE,
 "3000");
-        
pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
 "40000");
+        setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+        setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+        setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, 
"40000");
         pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as 
(name:chararray, age:int);");
         pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as 
(name:chararray, gender:chararray);");
         pigServer.registerQuery("C = join A by name, B by name using 
'skewed';");
         pigServer.store("C", "output5");
         FileSystem fs = cluster.getFileSystem();
-        FileStatus[] files = fs.listStatus(new Path("output5"), new 
PathFilter(){
-            @Override
-            public boolean accept(Path path) {
-                if (path.getName().startsWith("part")) {
-                    return true;
-                }
-                return false;
-            }
-        });
+        FileStatus[] files = fs.listStatus(new Path("output5"), 
PART_FILE_FILTER);
         assertEquals(files.length, 5);
     }
 
@@ -288,23 +258,15 @@ public class TestTezAutoParallelism {
     public void testSkewedFullJoinIncreaseParallelism() throws IOException{
         // skewed full join parallelism take the initial setting, since the 
join vertex has a broadcast(sample) dependency,
         // which prevent it changing parallelism
-        
pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION,
 "true");
-        
pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE,
 "3000");
-        
pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
 "40000");
+        setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+        setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+        setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, 
"40000");
         pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as 
(name:chararray, age:int);");
         pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as 
(name:chararray, gender:chararray);");
         pigServer.registerQuery("C = join A by name full, B by name using 
'skewed';");
         pigServer.store("C", "output6");
         FileSystem fs = cluster.getFileSystem();
-        FileStatus[] files = fs.listStatus(new Path("output5"), new 
PathFilter(){
-            @Override
-            public boolean accept(Path path) {
-                if (path.getName().startsWith("part")) {
-                    return true;
-                }
-                return false;
-            }
-        });
+        FileStatus[] files = fs.listStatus(new Path("output5"), 
PART_FILE_FILTER);
         assertEquals(files.length, 5);
     }
 
@@ -312,9 +274,9 @@ public class TestTezAutoParallelism {
     public void testSkewedJoinIncreaseParallelismWithScalar() throws 
IOException{
         // skewed join parallelism take the initial setting, since the join 
vertex has a broadcast(scalar) dependency,
         // which prevent it changing parallelism
-        
pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION,
 "true");
-        
pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE,
 "3000");
-        
pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
 "40000");
+        setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+        setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+        setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, 
"40000");
         pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as 
(name:chararray, age:int);");
         pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as 
(name:chararray, gender:chararray);");
         pigServer.registerQuery("C = join A by name, B by name using 
'skewed';");
@@ -324,19 +286,29 @@ public class TestTezAutoParallelism {
         pigServer.registerQuery("G = foreach C generate age/F.count, gender;");
         pigServer.store("G", "output7");
         FileSystem fs = cluster.getFileSystem();
-        FileStatus[] files = fs.listStatus(new Path("output7"), new 
PathFilter(){
-            @Override
-            public boolean accept(Path path) {
-                if (path.getName().startsWith("part")) {
-                    return true;
-                }
-                return false;
-            }
-        });
+        FileStatus[] files = fs.listStatus(new Path("output7"), 
PART_FILE_FILTER);
         assertEquals(files.length, 4);
     }
 
     @Test
+    public void testSkewedJoinRightInputAutoParallelism() throws IOException{
+        setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+        setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+        setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, 
"40000");
+        setProperty(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, "1.0");
+        setProperty(TezConfiguration.TEZ_AM_LOG_LEVEL, "DEBUG");
+        pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as 
(name:chararray, age:int);");
+        pigServer.registerQuery("B = load '" + INPUT_FILE1 + "' as 
(name:chararray, age:int);");
+        pigServer.registerQuery("B = FILTER B by name == 'Noah';");
+        pigServer.registerQuery("B1 = group B by name;");
+        pigServer.registerQuery("C = join A by name, B1 by group using 
'skewed';");
+        pigServer.store("C", "output8");
+        FileSystem fs = cluster.getFileSystem();
+        FileStatus[] files = fs.listStatus(new Path("output8"), 
PART_FILE_FILTER);
+        assertEquals(5, files.length);
+    }
+
+    @Test
     public void testFlattenParallelism() throws IOException{
         String outputDir = "/tmp/testFlattenParallelism";
         String script = "A = load '" + INPUT_FILE1 + "' as (name:chararray, 
age:int);"
@@ -423,9 +395,9 @@ public class TestTezAutoParallelism {
         // When there is a combiner operation involved user specified 
parallelism is overriden
         Util.createLogAppender("testAutoParallelism", writer, classesToLog);
         try {
-            
pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION,
 "true");
-            
pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE,
 "4000");
-            
pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
 "80000");
+            setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+            setProperty(MRConfiguration.MAX_SPLIT_SIZE, "4000");
+            setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, 
"80000");
             pigServer.setBatchOn();
             pigServer.registerScript(new 
ByteArrayInputStream(script.getBytes()));
             pigServer.executeBatch();
@@ -453,4 +425,12 @@ public class TestTezAutoParallelism {
             Util.deleteFile(cluster, outputDir);
         }
     }
+
+    private void setProperty(String property, String value) {
+        pigServer.getPigContext().getProperties().setProperty(property, value);
+    }
+
+    private void removeProperty(String property) {
+        pigServer.getPigContext().getProperties().remove(property);
+    }
 }


Reply via email to