Author: xuefu
Date: Sat Feb 6 05:12:59 2016
New Revision: 1728791
URL: http://svn.apache.org/viewvc?rev=1728791&view=rev
Log:
PIG-4784: Enable pig.disable.counter for spark engine (Liyun via Xuefu)
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java?rev=1728791&r1=1728790&r2=1728791&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
Sat Feb 6 05:12:59 2016
@@ -96,7 +96,8 @@ public class LoadConverter implements RD
ToTupleFunction ttf = new ToTupleFunction();
//create SparkCounter and set it for ToTupleFunction
- if (!op.isTmpLoad()) {
+ boolean disableCounter = jobConf.getBoolean("pig.disable.counter",
false);
+ if (!op.isTmpLoad() && !disableCounter) {
String counterName = SparkStatsUtil.getLoadSparkCounterName(op);
SparkPigStatusReporter counterReporter =
SparkPigStatusReporter.getInstance();
if (counterReporter.getCounters() != null) {
@@ -105,6 +106,7 @@ public class LoadConverter implements RD
counterName);
}
+ ttf.setDisableCounter(disableCounter);
ttf.setCounterGroupName(SparkStatsUtil.SPARK_INPUT_COUNTER_GROUP);
ttf.setCounterName(counterName);
ttf.setSparkCounters(SparkPigStatusReporter.getInstance().getCounters());
@@ -137,10 +139,11 @@ public class LoadConverter implements RD
private String counterGroupName;
private String counterName;
private SparkCounters sparkCounters;
+ private boolean disableCounter;
@Override
public Tuple apply(Tuple2<Text, Tuple> v1) {
- if (sparkCounters != null) {
+ if (sparkCounters != null && disableCounter == false) {
sparkCounters.increment(counterGroupName, counterName, 1L);
}
return v1._2();
@@ -157,6 +160,10 @@ public class LoadConverter implements RD
public void setSparkCounters(SparkCounters sparkCounters) {
this.sparkCounters = sparkCounters;
}
+
+ public void setDisableCounter(boolean disableCounter) {
+ this.disableCounter = disableCounter;
+ }
}
/**
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java?rev=1728791&r1=1728790&r2=1728791&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
(original)
+++
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
Sat Feb 6 05:12:59 2016
@@ -137,10 +137,11 @@ public class StoreConverter implements
private String counterGroupName;
private String counterName;
private SparkCounters sparkCounters;
+ private boolean disableCounter;
public Tuple2<Text, Tuple> call(Tuple v1) {
- if (sparkCounters != null) {
+ if (sparkCounters != null && disableCounter == false) {
sparkCounters.increment(counterGroupName, counterName, 1L);
}
return new Tuple2<Text, Tuple>(EMPTY_TEXT, v1);
@@ -157,11 +158,17 @@ public class StoreConverter implements
public void setSparkCounters(SparkCounters sparkCounter) {
this.sparkCounters = sparkCounter;
}
+
+ public void setDisableCounter(boolean disableCounter) {
+ this.disableCounter = disableCounter;
+ }
}
private FromTupleFunction buildFromTupleFunction(POStore op) {
FromTupleFunction ftf = new FromTupleFunction();
- if (!op.isTmpStore()) {
+ boolean disableCounter = op.disableCounter();
+ if (!op.isTmpStore() && !disableCounter) {
+ ftf.setDisableCounter(disableCounter);
ftf.setCounterGroupName(SparkStatsUtil.SPARK_STORE_COUNTER_GROUP);
ftf.setCounterName(SparkStatsUtil.getStoreSparkCounterName(op));
SparkPigStatusReporter counterReporter =
SparkPigStatusReporter.getInstance();
Modified:
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java?rev=1728791&r1=1728790&r2=1728791&view=diff
==============================================================================
---
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
(original)
+++
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
Sat Feb 6 05:12:59 2016
@@ -46,6 +46,7 @@ public class SparkJobStats extends JobSt
private int jobId;
private Map<String, Long> stats = Maps.newLinkedHashMap();
+ private boolean disableCounter;
protected SparkJobStats(int jobId, PigStats.JobGraph plan, Configuration
conf) {
this(String.valueOf(jobId), plan, conf);
@@ -57,11 +58,19 @@ public class SparkJobStats extends JobSt
setConf(conf);
}
+ public void setConf(Configuration conf) {
+ super.setConf(conf);
+ disableCounter = conf.getBoolean("pig.disable.counter", false);
+ }
+
public void addOutputInfo(POStore poStore, boolean success,
JobMetricsListener jobMetricsListener) {
if (!poStore.isTmpStore()) {
long bytes = getOutputSize(poStore, conf);
- long recordsCount =
SparkStatsUtil.getStoreSparkCounterValue(poStore);
+ long recordsCount = -1;
+ if (disableCounter == false) {
+ recordsCount =
SparkStatsUtil.getStoreSparkCounterValue(poStore);
+ }
OutputStats outputStats = new
OutputStats(poStore.getSFile().getFileName(),
bytes, recordsCount, success);
outputStats.setPOStore(poStore);
@@ -74,7 +83,10 @@ public class SparkJobStats extends JobSt
public void addInputStats(POLoad po, boolean success,
boolean singleInput) {
- long recordsCount = SparkStatsUtil.getLoadSparkCounterValue(po);
+ long recordsCount = -1;
+ if (disableCounter == false) {
+ recordsCount = SparkStatsUtil.getLoadSparkCounterValue(po);
+ }
long bytesRead = -1;
if (singleInput && stats.get("BytesRead") != null) {
bytesRead = stats.get("BytesRead");
Modified: pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java
URL:
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java?rev=1728791&r1=1728790&r2=1728791&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java Sat Feb 6
05:12:59 2016
@@ -876,7 +876,13 @@ public class TestPigRunner {
List<OutputStats> outputs = stats.getOutputStats();
assertEquals(1, outputs.size());
OutputStats outstats = outputs.get(0);
- assertEquals(9, outstats.getNumberRecords());
+ //In spark mode, if pig.disable.counter = true, the number of
records of the
+ //output are not calculated.
+ if (execType.equals("spark")) {
+ assertEquals(-1, outstats.getNumberRecords());
+ } else {
+ assertEquals(9, outstats.getNumberRecords());
+ }
} finally {
new File(PIG_FILE).delete();
Util.deleteFile(cluster, OUTPUT_FILE);
@@ -958,8 +964,12 @@ public class TestPigRunner {
PigStats stats = PigRunner.run(args, new
TestNotificationListener(execType));
assertTrue(stats.isSuccessful());
-
- assertEquals(1, stats.getNumberJobs());
+ if (execType.equals("spark")) {
+ //2 POStore generates 2 spark jobs
+ assertEquals(2, stats.getNumberJobs());
+ } else {
+ assertEquals(1, stats.getNumberJobs());
+ }
List<OutputStats> outputs = stats.getOutputStats();
assertEquals(2, outputs.size());
if (execType.equals("tez")) {
@@ -975,7 +985,11 @@ public class TestPigRunner {
List<InputStats> inputs = stats.getInputStats();
assertEquals(1, inputs.size());
InputStats instats = inputs.get(0);
- assertEquals(5, instats.getNumberRecords());
+ if (execType.equals("spark")) {
+ assertEquals(-1, instats.getNumberRecords());
+ } else {
+ assertEquals(5, instats.getNumberRecords());
+ }
} finally {
new File(PIG_FILE).delete();
Util.deleteFile(cluster, OUTPUT_FILE);