Author: xuefu
Date: Wed Nov 23 14:09:59 2016
New Revision: 1770973
URL: http://svn.apache.org/viewvc?rev=1770973&view=rev
Log:
PIG-4899: The number of records of input file is calculated wrongly in spark
mode in multiquery case (Adam Szita via Xuefu)
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java?rev=1770973&r1=1770972&r2=1770973&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
Wed Nov 23 14:09:59 2016
@@ -162,6 +162,10 @@ public class LoadConverter implements RD
private SparkEngineConf sparkEngineConf;
private boolean initialized;
+ //LoadConverter#ToTupleFunction is executed more than once in
multiquery case causing
+ //invalid number of input records, 'skip' flag below indicates first
load is finished.
+ private boolean skip;
+
public ToTupleFunction(SparkEngineConf sparkEngineConf){
this.sparkEngineConf = sparkEngineConf;
@@ -172,9 +176,15 @@ public class LoadConverter implements RD
if (!initialized) {
long partitionId = TaskContext.get().partitionId();
PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX,
Long.toString(partitionId));
+
+ //We're in POSplit and already counted all input records,
+ //in a multiquery case skip will be set to true after the
first load is finished:
+ if (sparkCounters != null &&
SparkPigStatusReporter.getInstance().getCounters().getCounter(counterGroupName,
counterName).getValue() > 0) {
+ skip=true;
+ }
initialized = true;
}
- if (sparkCounters != null && disableCounter == false) {
+ if (sparkCounters != null && disableCounter == false && skip ==
false) {
sparkCounters.increment(counterGroupName, counterName, 1L);
}
return v1._2();
Modified: pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java?rev=1770973&r1=1770972&r2=1770973&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java Wed Nov 23
14:09:59 2016
@@ -32,6 +32,8 @@ import org.apache.pig.builtin.mock.Stora
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.tools.pigstats.InputStats;
+
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -880,6 +882,27 @@ public class TestMultiQuery {
Util.checkQueryOutputsAfterSort(actualResults.iterator(),
expectedResults);
}
+ @Test
+ public void testMultiQueryJiraPig4899() throws Exception {
+ myPig.setBatchOn();
+
+ myPig.registerQuery("a = load 'passwd' "
+ + "using PigStorage(':') as (uname:chararray,
passwd:chararray, uid:int, gid:int);");
+ myPig.registerQuery("b1 = foreach a generate uname;");
+ myPig.registerQuery("b2 = foreach a generate uid;");
+ myPig.registerQuery("store b1 into 'output1';");
+ myPig.registerQuery("store b2 into 'output2';");
+
+ List<ExecJob> jobs = myPig.executeBatch();
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+ List<InputStats> stats = job.getStatistics().getInputStats();
+ assertEquals(1,stats.size());
+ InputStats stat = stats.get(0);
+ assertEquals("Number of records in passwd file is
14",14,stat.getNumberRecords());
+ }
+ }
+
//
--------------------------------------------------------------------------
// Helper methods