Author: rohini
Date: Sat May 23 06:46:38 2015
New Revision: 1681271

URL: http://svn.apache.org/r1681271
Log:
PIG-4569: Fix e2e test Rank_1 failure (rohini)

Modified:
    pig/branches/branch-0.15/CHANGES.txt
    
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java

Modified: pig/branches/branch-0.15/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/branches/branch-0.15/CHANGES.txt?rev=1681271&r1=1681270&r2=1681271&view=diff
==============================================================================
--- pig/branches/branch-0.15/CHANGES.txt (original)
+++ pig/branches/branch-0.15/CHANGES.txt Sat May 23 06:46:38 2015
@@ -72,6 +72,8 @@ PIG-4333: Split BigData tests into multi
  
 BUG FIXES
 
+PIG-4569: Fix e2e test Rank_1 failure (rohini)
+
 PIG-4490: MIN/MAX builtin UDFs return wrong results when accumulating for 
strings (xplenty via rohini)
 
 PIG-4418: NullPointerException in JVMReuseImpl (rohini)

Modified: 
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: 
http://svn.apache.org/viewvc/pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1681271&r1=1681270&r2=1681271&view=diff
==============================================================================
--- 
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 (original)
+++ 
pig/branches/branch-0.15/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
 Sat May 23 06:46:38 2015
@@ -38,6 +38,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
+import java.util.TreeMap;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -187,7 +188,6 @@ public class JobControlCompiler{
     private Map<Job, Pair<List<POStore>, Path>> jobStoreMap;
 
     private Map<Job, MapReduceOper> jobMroMap;
-    private int counterSize;
 
     public JobControlCompiler(PigContext pigContext, Configuration conf) {
         this(pigContext, conf, null);
@@ -378,6 +378,7 @@ public class JobControlCompiler{
         Counters counters;
         Group groupCounters;
 
+        int counterSize = -1;
         Long previousValue = 0L;
         Long previousSum = 0L;
         ArrayList<Pair<String,Long>> counterPairs;
@@ -403,21 +404,28 @@ public class JobControlCompiler{
             }
             groupCounters = counters.getGroup(groupName);
 
-            HashMap<Integer,Long> counterList = new HashMap<Integer, Long>();
+            TreeMap<Integer,Long> counterList = new TreeMap<Integer, Long>();
 
-            int numTasks = isRowNumber ? job.getJobConf().getNumMapTasks() : 
job.getJobConf().getNumReduceTasks();
-            for ( int i=0; i < numTasks; i++ ) {
-                Long value = groupCounters.getCounter(Integer.toString(i));
-                counterList.put(i, value);
+            Iterator<Counter> it = groupCounters.iterator();
+            while (it.hasNext()) {
+                try {
+                    Counter c = it.next();
+                    counterList.put(Integer.valueOf(c.getDisplayName()), 
c.getValue());
+                } catch (Exception ex) {
+                    ex.printStackTrace();
+                }
             }
 
             counterSize = counterList.size();
             counterPairs = new ArrayList<Pair<String,Long>>();
 
-            for(int i = 0; i < counterSize; i++){
+            // There could be empty tasks with no counters. That is not an 
issue
+            // and we only need to calculate offsets for non-empty task ids
+            // which will be accessed in PORank.
+            for (Entry<Integer, Long> entry : counterList.entrySet()) {
                 previousSum += previousValue;
-                previousValue = counterList.get(Integer.valueOf(i));
-                counterPairs.add(new Pair<String, 
Long>(JobControlCompiler.PIG_MAP_COUNTER + operationID + 
JobControlCompiler.PIG_MAP_SEPARATOR + i, previousSum));
+                previousValue = entry.getValue();
+                counterPairs.add(new Pair<String, 
Long>(JobControlCompiler.PIG_MAP_COUNTER + operationID + 
JobControlCompiler.PIG_MAP_SEPARATOR + entry.getKey(), previousSum));
             }
 
             globalCounters.put(operationID, counterPairs);


Reply via email to