Author: rohini
Date: Sat May 23 06:43:58 2015
New Revision: 1681270
URL: http://svn.apache.org/r1681270
Log:
PIG-4569: Fix e2e test Rank_1 failure (rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1681270&r1=1681269&r2=1681270&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sat May 23 06:43:58 2015
@@ -80,6 +80,8 @@ PIG-4333: Split BigData tests into multi
BUG FIXES
+PIG-4569: Fix e2e test Rank_1 failure (rohini)
+
PIG-4490: MIN/MAX builtin UDFs return wrong results when accumulating for
strings (xplenty via rohini)
PIG-4418: NullPointerException in JVMReuseImpl (rohini)
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1681270&r1=1681269&r2=1681270&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
Sat May 23 06:43:58 2015
@@ -38,6 +38,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
+import java.util.TreeMap;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -187,7 +188,6 @@ public class JobControlCompiler{
private Map<Job, Pair<List<POStore>, Path>> jobStoreMap;
private Map<Job, MapReduceOper> jobMroMap;
- private int counterSize;
public JobControlCompiler(PigContext pigContext, Configuration conf) {
this(pigContext, conf, null);
@@ -378,6 +378,7 @@ public class JobControlCompiler{
Counters counters;
Group groupCounters;
+ int counterSize = -1;
Long previousValue = 0L;
Long previousSum = 0L;
ArrayList<Pair<String,Long>> counterPairs;
@@ -403,21 +404,28 @@ public class JobControlCompiler{
}
groupCounters = counters.getGroup(groupName);
- HashMap<Integer,Long> counterList = new HashMap<Integer, Long>();
+ TreeMap<Integer,Long> counterList = new TreeMap<Integer, Long>();
- int numTasks = isRowNumber ? job.getJobConf().getNumMapTasks() :
job.getJobConf().getNumReduceTasks();
- for ( int i=0; i < numTasks; i++ ) {
- Long value = groupCounters.getCounter(Integer.toString(i));
- counterList.put(i, value);
+ Iterator<Counter> it = groupCounters.iterator();
+ while (it.hasNext()) {
+ try {
+ Counter c = it.next();
+ counterList.put(Integer.valueOf(c.getDisplayName()),
c.getValue());
+ } catch (Exception ex) {
+ ex.printStackTrace();
+ }
}
counterSize = counterList.size();
counterPairs = new ArrayList<Pair<String,Long>>();
- for(int i = 0; i < counterSize; i++){
+ // There could be empty tasks with no counters. That is not an
issue
+ // and we only need to calculate offsets for non-empty task ids
+ // which will be accessed in PORank.
+ for (Entry<Integer, Long> entry : counterList.entrySet()) {
previousSum += previousValue;
- previousValue = counterList.get(Integer.valueOf(i));
- counterPairs.add(new Pair<String,
Long>(JobControlCompiler.PIG_MAP_COUNTER + operationID +
JobControlCompiler.PIG_MAP_SEPARATOR + i, previousSum));
+ previousValue = entry.getValue();
+ counterPairs.add(new Pair<String,
Long>(JobControlCompiler.PIG_MAP_COUNTER + operationID +
JobControlCompiler.PIG_MAP_SEPARATOR + entry.getKey(), previousSum));
}
globalCounters.put(operationID, counterPairs);