Author: xuefu
Date: Sat Feb  6 05:12:59 2016
New Revision: 1728791

URL: http://svn.apache.org/viewvc?rev=1728791&view=rev
Log:
PIG-4784: Enable pig.disable.counter for spark engine (Liyun via Xuefu)

Modified:
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
    
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
    
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
    pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java?rev=1728791&r1=1728790&r2=1728791&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
 Sat Feb  6 05:12:59 2016
@@ -96,7 +96,8 @@ public class LoadConverter implements RD
         ToTupleFunction ttf = new ToTupleFunction();
 
         //create SparkCounter and set it for ToTupleFunction
-        if (!op.isTmpLoad()) {
+        boolean disableCounter = jobConf.getBoolean("pig.disable.counter", 
false);
+        if (!op.isTmpLoad() && !disableCounter) {
             String counterName = SparkStatsUtil.getLoadSparkCounterName(op);
             SparkPigStatusReporter counterReporter = 
SparkPigStatusReporter.getInstance();
             if (counterReporter.getCounters() != null) {
@@ -105,6 +106,7 @@ public class LoadConverter implements RD
                         counterName);
             }
 
+            ttf.setDisableCounter(disableCounter);
             ttf.setCounterGroupName(SparkStatsUtil.SPARK_INPUT_COUNTER_GROUP);
             ttf.setCounterName(counterName);
             
ttf.setSparkCounters(SparkPigStatusReporter.getInstance().getCounters());
@@ -137,10 +139,11 @@ public class LoadConverter implements RD
         private String counterGroupName;
         private String counterName;
         private SparkCounters sparkCounters;
+        private boolean disableCounter;
 
         @Override
         public Tuple apply(Tuple2<Text, Tuple> v1) {
-            if (sparkCounters != null) {
+            if (sparkCounters != null && disableCounter == false) {
                 sparkCounters.increment(counterGroupName, counterName, 1L);
             }
             return v1._2();
@@ -157,6 +160,10 @@ public class LoadConverter implements RD
         public void setSparkCounters(SparkCounters sparkCounters) {
             this.sparkCounters = sparkCounters;
         }
+
+        public void setDisableCounter(boolean disableCounter) {
+            this.disableCounter = disableCounter;
+        }
     }
 
     /**

Modified: 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java?rev=1728791&r1=1728790&r2=1728791&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
 (original)
+++ 
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/StoreConverter.java
 Sat Feb  6 05:12:59 2016
@@ -137,10 +137,11 @@ public class StoreConverter implements
         private String counterGroupName;
         private String counterName;
         private SparkCounters sparkCounters;
+        private boolean disableCounter;
 
 
         public Tuple2<Text, Tuple> call(Tuple v1) {
-            if (sparkCounters != null) {
+            if (sparkCounters != null && disableCounter == false) {
                 sparkCounters.increment(counterGroupName, counterName, 1L);
             }
             return new Tuple2<Text, Tuple>(EMPTY_TEXT, v1);
@@ -157,11 +158,17 @@ public class StoreConverter implements
         public void setSparkCounters(SparkCounters sparkCounter) {
             this.sparkCounters = sparkCounter;
         }
+
+        public void setDisableCounter(boolean disableCounter) {
+            this.disableCounter = disableCounter;
+        }
     }
 
     private FromTupleFunction buildFromTupleFunction(POStore op) {
         FromTupleFunction ftf = new FromTupleFunction();
-        if (!op.isTmpStore()) {
+        boolean disableCounter = op.disableCounter();
+        if (!op.isTmpStore() && !disableCounter) {
+            ftf.setDisableCounter(disableCounter);
             ftf.setCounterGroupName(SparkStatsUtil.SPARK_STORE_COUNTER_GROUP);
             ftf.setCounterName(SparkStatsUtil.getStoreSparkCounterName(op));
             SparkPigStatusReporter counterReporter = 
SparkPigStatusReporter.getInstance();

Modified: 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java?rev=1728791&r1=1728790&r2=1728791&view=diff
==============================================================================
--- 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java 
(original)
+++ 
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java 
Sat Feb  6 05:12:59 2016
@@ -46,6 +46,7 @@ public class SparkJobStats extends JobSt
 
     private int jobId;
     private Map<String, Long> stats = Maps.newLinkedHashMap();
+    private boolean disableCounter;
 
     protected SparkJobStats(int jobId, PigStats.JobGraph plan, Configuration 
conf) {
         this(String.valueOf(jobId), plan, conf);
@@ -57,11 +58,19 @@ public class SparkJobStats extends JobSt
         setConf(conf);
     }
 
+    public void setConf(Configuration conf) {
+        super.setConf(conf);
+        disableCounter = conf.getBoolean("pig.disable.counter", false);
+    }
+
     public void addOutputInfo(POStore poStore, boolean success,
                               JobMetricsListener jobMetricsListener) {
         if (!poStore.isTmpStore()) {
             long bytes = getOutputSize(poStore, conf);
-            long recordsCount = 
SparkStatsUtil.getStoreSparkCounterValue(poStore);
+            long recordsCount = -1;
+            if (disableCounter == false) {
+                recordsCount = 
SparkStatsUtil.getStoreSparkCounterValue(poStore);
+            }
             OutputStats outputStats = new 
OutputStats(poStore.getSFile().getFileName(),
                     bytes, recordsCount, success);
             outputStats.setPOStore(poStore);
@@ -74,7 +83,10 @@ public class SparkJobStats extends JobSt
     public void addInputStats(POLoad po, boolean success,
                               boolean singleInput) {
 
-        long recordsCount = SparkStatsUtil.getLoadSparkCounterValue(po);
+        long recordsCount = -1;
+        if (disableCounter == false) {
+            recordsCount = SparkStatsUtil.getLoadSparkCounterValue(po);
+        }
         long bytesRead = -1;
         if (singleInput && stats.get("BytesRead") != null) {
             bytesRead = stats.get("BytesRead");

Modified: pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java
URL: 
http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java?rev=1728791&r1=1728790&r2=1728791&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java Sat Feb  6 
05:12:59 2016
@@ -876,7 +876,13 @@ public class TestPigRunner {
             List<OutputStats> outputs = stats.getOutputStats();
             assertEquals(1, outputs.size());
             OutputStats outstats = outputs.get(0);
-            assertEquals(9, outstats.getNumberRecords());
+            //In spark mode, if pig.disable.counter = true, the number of 
records of the
+            //output are not calculated.
+            if (execType.equals("spark")) {
+                assertEquals(-1, outstats.getNumberRecords());
+            } else {
+                assertEquals(9, outstats.getNumberRecords());
+            }
         } finally {
             new File(PIG_FILE).delete();
             Util.deleteFile(cluster, OUTPUT_FILE);
@@ -958,8 +964,12 @@ public class TestPigRunner {
             PigStats stats = PigRunner.run(args, new 
TestNotificationListener(execType));
 
             assertTrue(stats.isSuccessful());
-
-            assertEquals(1, stats.getNumberJobs());
+            if (execType.equals("spark")) {
+                //2 POStore generates 2 spark jobs
+                assertEquals(2, stats.getNumberJobs());
+            } else {
+                assertEquals(1, stats.getNumberJobs());
+            }
             List<OutputStats> outputs = stats.getOutputStats();
             assertEquals(2, outputs.size());
             if (execType.equals("tez")) {
@@ -975,7 +985,11 @@ public class TestPigRunner {
             List<InputStats> inputs = stats.getInputStats();
             assertEquals(1, inputs.size());
             InputStats instats = inputs.get(0);
-            assertEquals(5, instats.getNumberRecords());
+            if (execType.equals("spark")) {
+                assertEquals(-1, instats.getNumberRecords());
+            } else {
+                assertEquals(5, instats.getNumberRecords());
+            }
         } finally {
             new File(PIG_FILE).delete();
             Util.deleteFile(cluster, OUTPUT_FILE);


Reply via email to