Author: zly
Date: Thu Feb 16 21:26:32 2017
New Revision: 1783304
URL: http://svn.apache.org/viewvc?rev=1783304&view=rev
Log:
PIG-4899: The number of records of input file is calculated wrongly in spark
mode in multiquery case (Adam via Liyun)
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java?rev=1783304&r1=1783303&r2=1783304&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
Thu Feb 16 21:26:32 2017
@@ -162,13 +162,8 @@ public class LoadConverter implements RD
private SparkEngineConf sparkEngineConf;
private boolean initialized;
- //LoadConverter#ToTupleFunction is executed more than once in
multiquery case causing
- //invalid number of input records, 'skip' flag below indicates first
load is finished.
- private boolean skip;
-
public ToTupleFunction(SparkEngineConf sparkEngineConf){
this.sparkEngineConf = sparkEngineConf;
-
}
@Override
@@ -177,14 +172,9 @@ public class LoadConverter implements RD
long partitionId = TaskContext.get().partitionId();
PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX,
Long.toString(partitionId));
- //We're in POSplit and already counted all input records,
- //in a multiquery case skip will be set to true after the
first load is finished:
- if (sparkCounters != null &&
SparkPigStatusReporter.getInstance().getCounters().getCounter(counterGroupName,
counterName).getValue() > 0) {
- skip=true;
- }
initialized = true;
}
- if (sparkCounters != null && disableCounter == false && skip ==
false) {
+ if (sparkCounters != null && disableCounter == false) {
sparkCounters.increment(counterGroupName, counterName, 1L);
}
return v1._2();
Modified:
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java?rev=1783304&r1=1783303&r2=1783304&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
(original)
+++
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
Thu Feb 16 21:26:32 2017
@@ -38,6 +38,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.tools.pigstats.InputStats;
import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigStats;
@@ -133,6 +134,9 @@ public class SparkPigStats extends PigSt
Map.Entry pairs = (Map.Entry)statIt.next();
LOG.info("\t" + pairs.getKey() + " : " + pairs.getValue());
}
+ for (InputStats inputStat : js.getInputs()){
+ LOG.info("\t"+inputStat.getDisplayString());
+ }
}
}
Modified:
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java?rev=1783304&r1=1783303&r2=1783304&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
(original)
+++
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
Thu Feb 16 21:26:32 2017
@@ -18,8 +18,12 @@
package org.apache.pig.tools.pigstats.spark;
-import org.apache.hadoop.mapred.JobConf;
+import java.util.List;
+
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener;
import
org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
@@ -97,7 +101,21 @@ public class SparkStatsUtil {
public static long getLoadSparkCounterValue(POLoad load) {
SparkPigStatusReporter reporter = SparkPigStatusReporter.getInstance();
- return reporter.getCounters().getValue(SPARK_INPUT_COUNTER_GROUP,
getLoadSparkCounterName(load));
+ int loadersCount = countCoLoadsIfInSplit(load,load.getParentPlan());
+ return reporter.getCounters().getValue(SPARK_INPUT_COUNTER_GROUP,
getLoadSparkCounterName(load))/loadersCount;
+ }
+
+ private static int countCoLoadsIfInSplit(PhysicalOperator op, PhysicalPlan
pp){
+ List<PhysicalOperator> successors = pp.getSuccessors(op);
+ if (successors == null || successors.size()==0) return 1;
+ for (PhysicalOperator successor : successors){
+ if (successor instanceof POSplit){
+ return ((POSplit)successor).getPlans().size();
+ }else{
+ return countCoLoadsIfInSplit(successor,pp);
+ }
+ }
+ return 1;
}
public static boolean isJobSuccess(int jobID,