Author: rohini
Date: Wed Jan 6 22:25:18 2016
New Revision: 1723424
URL: http://svn.apache.org/viewvc?rev=1723424&view=rev
Log:
PIG-4757: Job stats on successfully read/output records wrong with multiple
inputs/outputs (rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java
pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
pig/trunk/test/org/apache/pig/test/TestPigRunner.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1723424&r1=1723423&r2=1723424&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Jan 6 22:25:18 2016
@@ -77,6 +77,8 @@ PIG-4639: Add better parser for Apache H
BUG FIXES
+PIG-4757: Job stats on successfully read/output records wrong with multiple
inputs/outputs (rohini)
+
PIG-4769: UnionOptimizer hits errors when merging vertex group into split
(rohini)
PIG-4768: EvalFunc reporter is null in Tez (rohini)
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java?rev=1723424&r1=1723423&r2=1723424&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java
Wed Jan 6 22:25:18 2016
@@ -32,25 +32,34 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
+import
org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezTaskConfigurable;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigImplConstants;
import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
+import org.apache.tez.common.counters.CounterGroup;
+import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.mapreduce.lib.MRReader;
import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.library.api.KeyValueReader;
/**
* POSimpleTezLoad is used on the backend to read tuples from a Tez MRInput
*/
-public class POSimpleTezLoad extends POLoad implements TezInput {
+public class POSimpleTezLoad extends POLoad implements TezInput,
TezTaskConfigurable {
private static final long serialVersionUID = 1L;
+
private String inputKey;
- private MRInput input;
- private KeyValueReader reader;
+
+ private transient ProcessorContext processorContext;
+ private transient MRInput input;
+ private transient KeyValueReader reader;
private transient Configuration conf;
private transient boolean finished = false;
+ private transient TezCounter inputRecordCounter;
public POSimpleTezLoad(OperatorKey k, LoadFunc loader) {
super(k, loader);
@@ -69,6 +78,12 @@ public class POSimpleTezLoad extends POL
}
@Override
+ public void initialize(ProcessorContext processorContext)
+ throws ExecException {
+ this.processorContext = processorContext;
+ }
+
+ @Override
public void addInputsToSkip(Set<String> inputsToSkip) {
}
@@ -93,6 +108,22 @@ public class POSimpleTezLoad extends POL
} catch (IOException e) {
throw new ExecException(e);
}
+
+ // Multiple inputs - other broadcast input like replicate join table,
order by sample.
+ // We use multi input counters to just get MRInput records count.
+ if (inputs.size() > 1) {
+ CounterGroup multiInputGroup = processorContext.getCounters()
+ .getGroup(MRPigStatsUtil.MULTI_INPUTS_COUNTER_GROUP);
+ if (multiInputGroup == null) {
+ processorContext.getCounters().addGroup(
+ MRPigStatsUtil.MULTI_INPUTS_COUNTER_GROUP,
+ MRPigStatsUtil.MULTI_INPUTS_COUNTER_GROUP);
+ }
+ String name =
MRPigStatsUtil.getMultiInputsCounterName(super.getLFile().getFileName(), 0);
+ if (name != null) {
+ inputRecordCounter = multiInputGroup.addCounter(name, name, 0);
+ }
+ }
}
/**
@@ -121,6 +152,9 @@ public class POSimpleTezLoad extends POL
Tuple next = (Tuple) reader.getCurrentValue();
res.result = next;
res.returnStatus = POStatus.STATUS_OK;
+ if (inputRecordCounter != null) {
+ inputRecordCounter.increment(1);
+ }
}
return res;
} catch (IOException e) {
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java?rev=1723424&r1=1723423&r2=1723424&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java
Wed Jan 6 22:25:18 2016
@@ -46,6 +46,7 @@ public class POStoreTez extends POStore
private String outputKey;
+ private transient ProcessorContext processorContext;
private transient MROutput output;
private transient KeyValueWriter writer;
private transient TezCounter outputRecordCounter;
@@ -78,19 +79,7 @@ public class POStoreTez extends POStore
@Override
public void initialize(ProcessorContext processorContext)
throws ExecException {
- if (isMultiStore()) {
- CounterGroup multiStoreGroup = processorContext.getCounters()
- .getGroup(MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP);
- if (multiStoreGroup == null) {
- processorContext.getCounters().addGroup(
- MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP,
- MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP);
- }
- String name = MRPigStatsUtil.getMultiStoreCounterName(this);
- if (name != null) {
- outputRecordCounter = multiStoreGroup.addCounter(name, name,
0);
- }
- }
+ this.processorContext = processorContext;
}
@Override
@@ -112,6 +101,21 @@ public class POStoreTez extends POStore
} catch (IOException e) {
throw new ExecException(e);
}
+
+ // Multiple outputs - can be another store or other outputs (shuffle,
broadcast)
+ if (outputs.size() > 1) {
+ CounterGroup multiStoreGroup = processorContext.getCounters()
+ .getGroup(MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP);
+ if (multiStoreGroup == null) {
+ processorContext.getCounters().addGroup(
+ MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP,
+ MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP);
+ }
+ String name = MRPigStatsUtil.getMultiStoreCounterName(this);
+ if (name != null) {
+ outputRecordCounter = multiStoreGroup.addCounter(name, name,
0);
+ }
+ }
}
@Override
@@ -123,9 +127,10 @@ public class POStoreTez extends POStore
if (illustrator == null) {
// PigOutputFormat.PigRecordWriter will call
storeFunc.putNext
writer.write(null, res.result);
- } else
+ } else {
illustratorMarkup(res.result, res.result, 0);
- res = empty;
+ }
+ res = RESULT_EMPTY;
if (outputRecordCounter != null) {
outputRecordCounter.increment(1);
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java?rev=1723424&r1=1723423&r2=1723424&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
(original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java Wed Jan
6 22:25:18 2016
@@ -22,7 +22,7 @@ import static org.apache.pig.tools.pigst
import static org.apache.pig.tools.pigstats.tez.TezDAGStats.TASK_COUNTER_GROUP;
import java.io.IOException;
-import java.util.HashMap;
+import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -80,8 +80,8 @@ public class TezVertexStats extends JobS
private long activeSpillCountObj = 0;
private long activeSpillCountRecs = 0;
- private Map<String, Long> multiStoreCounters
- = new HashMap<String, Long>();
+ private Map<String, Long> multiInputCounters = Maps.newHashMap();
+ private Map<String, Long> multiStoreCounters = Maps.newHashMap();
public TezVertexStats(String name, JobGraph plan, boolean isMapOpts) {
super(name, plan);
@@ -217,18 +217,33 @@ public class TezVertexStats extends JobS
return;
}
+ Map<String, Long> mIGroup =
counters.get(PigStatsUtil.MULTI_INPUTS_COUNTER_GROUP);
+ if (mIGroup != null) {
+ multiInputCounters.putAll(mIGroup);
+ }
+
// There is always only one load in a Tez vertex
for (FileSpec fs : loads) {
long records = -1;
long hdfsBytesRead = -1;
String filename = fs.getFileName();
if (counters != null) {
- Map<String, Long> taskCounter =
counters.get(TASK_COUNTER_GROUP);
- if (taskCounter != null
- &&
taskCounter.get(TaskCounter.INPUT_RECORDS_PROCESSED.name()) != null) {
- records =
taskCounter.get(TaskCounter.INPUT_RECORDS_PROCESSED.name());
- numInputRecords = records;
+ if (mIGroup != null) {
+ Long n =
mIGroup.get(PigStatsUtil.getMultiInputsCounterName(fs.getFileName(), 0));
+ if (n != null) records = n;
+ }
+ if (records == -1) {
+ Map<String, Long> taskCounters =
counters.get(TASK_COUNTER_GROUP);
+ if (taskCounters != null
+ &&
taskCounters.get(TaskCounter.INPUT_RECORDS_PROCESSED.name()) != null) {
+ records =
taskCounters.get(TaskCounter.INPUT_RECORDS_PROCESSED.name());
+ }
+ }
+ if (isSuccessful() && records == -1) {
+ // Tez removes 0 value counters for efficiency.
+ records = 0;
}
+ numInputRecords = records;
if (counters.get(FS_COUNTER_GROUP) != null &&
counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_READ) != null) {
hdfsBytesRead =
counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_READ);
@@ -246,6 +261,11 @@ public class TezVertexStats extends JobS
return;
}
+ Map<String, Long> msGroup =
counters.get(PigStatsUtil.MULTI_STORE_COUNTER_GROUP);
+ if (msGroup != null) {
+ multiStoreCounters.putAll(msGroup);
+ }
+
for (POStore sto : stores) {
if (sto.isTmpStore()) {
continue;
@@ -254,16 +274,20 @@ public class TezVertexStats extends JobS
long hdfsBytesWritten = -1;
String filename = sto.getSFile().getFileName();
if (counters != null) {
- if (sto.isMultiStore()) {
- Map<String, Long> msGroup =
counters.get(PigStatsUtil.MULTI_STORE_COUNTER_GROUP);
- if (msGroup != null) {
- multiStoreCounters.putAll(msGroup);
- Long n =
msGroup.get(PigStatsUtil.getMultiStoreCounterName(sto));
- if (n != null) records = n;
+ if (msGroup != null) {
+ Long n =
msGroup.get(PigStatsUtil.getMultiStoreCounterName(sto));
+ if (n != null) records = n;
+ }
+ if (records == -1) {
+ Map<String, Long> taskCounters =
counters.get(TASK_COUNTER_GROUP);
+ if (taskCounters != null
+ &&
taskCounters.get(TaskCounter.OUTPUT_RECORDS.name()) != null) {
+ records =
taskCounters.get(TaskCounter.OUTPUT_RECORDS.name());
}
- } else if (counters.get(TASK_COUNTER_GROUP) != null
- &&
counters.get(TASK_COUNTER_GROUP).get(TaskCounter.OUTPUT_RECORDS.name()) !=
null) {
- records =
counters.get(TASK_COUNTER_GROUP).get(TaskCounter.OUTPUT_RECORDS.name());
+ }
+ if (isSuccessful() && records == -1) {
+ // Tez removes 0 value counters for efficiency.
+ records = 0;
}
if (records != -1) {
numOutputRecords += records;
@@ -385,7 +409,7 @@ public class TezVertexStats extends JobS
@Override
@Deprecated
public Map<String, Long> getMultiStoreCounters() {
- return multiStoreCounters;
+ return Collections.unmodifiableMap(multiStoreCounters);
}
@Override
Modified: pig/trunk/test/org/apache/pig/test/TestPigRunner.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigRunner.java?rev=1723424&r1=1723423&r2=1723424&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPigRunner.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPigRunner.java Wed Jan 6 22:25:18
2016
@@ -26,6 +26,9 @@ import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -35,7 +38,6 @@ import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.Counters;
-import org.apache.pig.ExecType;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigRunner;
import org.apache.pig.PigRunner.ReturnCode;
@@ -45,7 +47,6 @@ import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.plan.OperatorPlan;
import org.apache.pig.newplan.Operator;
-import org.apache.pig.tools.pigstats.EmptyPigStats;
import org.apache.pig.tools.pigstats.InputStats;
import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.OutputStats;
@@ -394,6 +395,89 @@ public class TestPigRunner {
Util.deleteFile(cluster, OUTPUT_FILE);
Util.deleteFile(cluster, OUTPUT_FILE_2);
}
+ }
+
+ @Test
+ public void simpleMultiQueryTest3() throws Exception {
+ final String INPUT_FILE_2 = "input2";
+ final String OUTPUT_FILE_2 = "output2";
+
+ PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE_2));
+ w.println("3\t4\t5");
+ w.println("5\t6\t7");
+ w.println("3\t7\t8");
+ w.close();
+ Util.copyFromLocalToCluster(cluster, INPUT_FILE_2, INPUT_FILE_2);
+ new File(INPUT_FILE_2).delete();
+
+ w = new PrintWriter(new FileWriter(PIG_FILE));
+ w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int,
a2:int);");
+ w.println("A1 = load '" + INPUT_FILE_2 + "' as (a0:int, a1:int,
a2:int);");
+ w.println("B = filter A by a0 == 3;");
+ w.println("C = filter A by a1 <=5;");
+ w.println("D = join C by a0, B by a0, A1 by a0 using 'replicated';");
+ w.println("store C into '" + OUTPUT_FILE + "';");
+ w.println("store D into '" + OUTPUT_FILE_2 + "';");
+ w.close();
+
+ try {
+ String[] args = { "-x", execType, PIG_FILE };
+ PigStats stats = PigRunner.run(args, new
TestNotificationListener(execType));
+ assertTrue(stats.isSuccessful());
+ if (Util.isMapredExecType(cluster.getExecType())) {
+ assertEquals(3, stats.getJobGraph().size());
+ } else {
+ assertEquals(1, stats.getJobGraph().size());
+ }
+
+ // Each output file should include the following:
+ // output:
+ // 1\t2\t3\n
+ // 5\t3\t4\n
+ // 3\t4\t5\n
+ // output2:
+ // 3\t4\t5\t3\t4\t5\t3\t4\t5\n
+ // 3\t4\t5\t3\t4\t5\t3\t7\t8\n
+ // 3\t4\t5\t3\t7\t8\t3\t4\t5\n
+ // 3\t4\t5\t3\t4\t5\t3\t7\t8\n
+ final int numOfRecords1 = 3;
+ final int numOfRecords2 = 4;
+ final int numOfBytesWritten1 = 18;
+ final int numOfBytesWritten2 = 72;
+
+ assertEquals(numOfRecords1 + numOfRecords2,
stats.getRecordWritten());
+ assertEquals(numOfBytesWritten1 + numOfBytesWritten2,
stats.getBytesWritten());
+
+ List<String> outputNames = new
ArrayList<String>(stats.getOutputNames());
+ assertTrue(outputNames.size() == 2);
+ Collections.sort(outputNames);
+ assertEquals(OUTPUT_FILE, outputNames.get(0));
+ assertEquals(OUTPUT_FILE_2, outputNames.get(1));
+ assertEquals(3, stats.getNumberRecords(OUTPUT_FILE));
+ assertEquals(4, stats.getNumberRecords(OUTPUT_FILE_2));
+
+ List<InputStats> inputStats = new
ArrayList<InputStats>(stats.getInputStats());
+ assertTrue(inputStats.size() == 2);
+ Collections.sort(inputStats, new Comparator<InputStats>() {
+ @Override
+ public int compare(InputStats o1, InputStats o2) {
+ return o1.getLocation().compareTo(o2.getLocation());
+ }
+ });
+ assertEquals(5, inputStats.get(0).getNumberRecords());
+ assertEquals(3, inputStats.get(1).getNumberRecords());
+ // For mapreduce, since hdfs bytes read includes replicated tables
bytes read is wrong
+ // Since Tez does has only one load per job its values are correct
+ if (!Util.isMapredExecType(cluster.getExecType())) {
+ assertEquals(30, inputStats.get(0).getBytes());
+ assertEquals(18, inputStats.get(1).getBytes());
+ }
+ } finally {
+ new File(PIG_FILE).delete();
+ Util.deleteFile(cluster, INPUT_FILE_2);
+ Util.deleteFile(cluster, OUTPUT_FILE);
+ Util.deleteFile(cluster, OUTPUT_FILE_2);
+ }
}
@Test