Author: rohini
Date: Wed Jan  6 22:25:18 2016
New Revision: 1723424

URL: http://svn.apache.org/viewvc?rev=1723424&view=rev
Log:
PIG-4757: Job stats on successfully read/output records wrong with multiple 
inputs/outputs (rohini)

Modified:
    pig/trunk/CHANGES.txt
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java
    pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
    pig/trunk/test/org/apache/pig/test/TestPigRunner.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1723424&r1=1723423&r2=1723424&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Jan  6 22:25:18 2016
@@ -77,6 +77,8 @@ PIG-4639: Add better parser for Apache H
 
 BUG FIXES
 
+PIG-4757: Job stats on successfully read/output records wrong with multiple 
inputs/outputs (rohini)
+
 PIG-4769: UnionOptimizer hits errors when merging vertex group into split 
(rohini)
 
 PIG-4768: EvalFunc reporter is null in Tez (rohini)

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java?rev=1723424&r1=1723423&r2=1723424&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POSimpleTezLoad.java
 Wed Jan  6 22:25:18 2016
@@ -32,25 +32,34 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import 
org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
+import 
org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezTaskConfigurable;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
+import org.apache.tez.common.counters.CounterGroup;
+import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.lib.MRReader;
 import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.api.ProcessorContext;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 
 /**
  * POSimpleTezLoad is used on the backend to read tuples from a Tez MRInput
  */
-public class POSimpleTezLoad extends POLoad implements TezInput {
+public class POSimpleTezLoad extends POLoad implements TezInput, 
TezTaskConfigurable {
 
     private static final long serialVersionUID = 1L;
+
     private String inputKey;
-    private MRInput input;
-    private KeyValueReader reader;
+
+    private transient ProcessorContext processorContext;
+    private transient MRInput input;
+    private transient KeyValueReader reader;
     private transient Configuration conf;
     private transient boolean finished = false;
+    private transient TezCounter inputRecordCounter;
 
     public POSimpleTezLoad(OperatorKey k, LoadFunc loader) {
         super(k, loader);
@@ -69,6 +78,12 @@ public class POSimpleTezLoad extends POL
     }
 
     @Override
+    public void initialize(ProcessorContext processorContext)
+            throws ExecException {
+        this.processorContext = processorContext;
+    }
+
+    @Override
     public void addInputsToSkip(Set<String> inputsToSkip) {
     }
 
@@ -93,6 +108,22 @@ public class POSimpleTezLoad extends POL
         } catch (IOException e) {
             throw new ExecException(e);
         }
+
+        // Multiple inputs - other broadcast input like replicate join table, 
order by sample.
+        // We use multi input counters to just get MRInput records count.
+        if (inputs.size() > 1) {
+            CounterGroup multiInputGroup = processorContext.getCounters()
+                    .getGroup(MRPigStatsUtil.MULTI_INPUTS_COUNTER_GROUP);
+            if (multiInputGroup == null) {
+                processorContext.getCounters().addGroup(
+                        MRPigStatsUtil.MULTI_INPUTS_COUNTER_GROUP,
+                        MRPigStatsUtil.MULTI_INPUTS_COUNTER_GROUP);
+            }
+            String name = 
MRPigStatsUtil.getMultiInputsCounterName(super.getLFile().getFileName(), 0);
+            if (name != null) {
+                inputRecordCounter = multiInputGroup.addCounter(name, name, 0);
+            }
+        }
     }
 
     /**
@@ -121,6 +152,9 @@ public class POSimpleTezLoad extends POL
                 Tuple next = (Tuple) reader.getCurrentValue();
                 res.result = next;
                 res.returnStatus = POStatus.STATUS_OK;
+                if (inputRecordCounter != null) {
+                    inputRecordCounter.increment(1);
+                }
             }
             return res;
         } catch (IOException e) {

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java?rev=1723424&r1=1723423&r2=1723424&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java
 Wed Jan  6 22:25:18 2016
@@ -46,6 +46,7 @@ public class POStoreTez extends POStore
 
     private String outputKey;
 
+    private transient ProcessorContext processorContext;
     private transient MROutput output;
     private transient KeyValueWriter writer;
     private transient TezCounter outputRecordCounter;
@@ -78,19 +79,7 @@ public class POStoreTez extends POStore
     @Override
     public void initialize(ProcessorContext processorContext)
             throws ExecException {
-        if (isMultiStore()) {
-            CounterGroup multiStoreGroup = processorContext.getCounters()
-                    .getGroup(MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP);
-            if (multiStoreGroup == null) {
-                processorContext.getCounters().addGroup(
-                        MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP,
-                        MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP);
-            }
-            String name = MRPigStatsUtil.getMultiStoreCounterName(this);
-            if (name != null) {
-                outputRecordCounter = multiStoreGroup.addCounter(name, name, 
0);
-            }
-        }
+        this.processorContext = processorContext;
     }
 
     @Override
@@ -112,6 +101,21 @@ public class POStoreTez extends POStore
         } catch (IOException e) {
             throw new ExecException(e);
         }
+
+        // Multiple outputs - can be another store or other outputs (shuffle, 
broadcast)
+        if (outputs.size() > 1) {
+            CounterGroup multiStoreGroup = processorContext.getCounters()
+                    .getGroup(MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP);
+            if (multiStoreGroup == null) {
+                processorContext.getCounters().addGroup(
+                        MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP,
+                        MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP);
+            }
+            String name = MRPigStatsUtil.getMultiStoreCounterName(this);
+            if (name != null) {
+                outputRecordCounter = multiStoreGroup.addCounter(name, name, 
0);
+            }
+        }
     }
 
     @Override
@@ -123,9 +127,10 @@ public class POStoreTez extends POStore
                 if (illustrator == null) {
                     // PigOutputFormat.PigRecordWriter will call 
storeFunc.putNext
                     writer.write(null, res.result);
-                } else
+                } else {
                     illustratorMarkup(res.result, res.result, 0);
-                res = empty;
+                }
+                res = RESULT_EMPTY;
 
                 if (outputRecordCounter != null) {
                     outputRecordCounter.increment(1);

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java?rev=1723424&r1=1723423&r2=1723424&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java 
(original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java Wed Jan 
 6 22:25:18 2016
@@ -22,7 +22,7 @@ import static org.apache.pig.tools.pigst
 import static org.apache.pig.tools.pigstats.tez.TezDAGStats.TASK_COUNTER_GROUP;
 
 import java.io.IOException;
-import java.util.HashMap;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -80,8 +80,8 @@ public class TezVertexStats extends JobS
     private long activeSpillCountObj = 0;
     private long activeSpillCountRecs = 0;
 
-    private Map<String, Long> multiStoreCounters
-            = new HashMap<String, Long>();
+    private Map<String, Long> multiInputCounters = Maps.newHashMap();
+    private Map<String, Long> multiStoreCounters = Maps.newHashMap();
 
     public TezVertexStats(String name, JobGraph plan, boolean isMapOpts) {
         super(name, plan);
@@ -217,18 +217,33 @@ public class TezVertexStats extends JobS
             return;
         }
 
+        Map<String, Long> mIGroup = 
counters.get(PigStatsUtil.MULTI_INPUTS_COUNTER_GROUP);
+        if (mIGroup != null) {
+            multiInputCounters.putAll(mIGroup);
+        }
+
         // There is always only one load in a Tez vertex
         for (FileSpec fs : loads) {
             long records = -1;
             long hdfsBytesRead = -1;
             String filename = fs.getFileName();
             if (counters != null) {
-                Map<String, Long> taskCounter = 
counters.get(TASK_COUNTER_GROUP);
-                if (taskCounter != null
-                        && 
taskCounter.get(TaskCounter.INPUT_RECORDS_PROCESSED.name()) != null) {
-                    records = 
taskCounter.get(TaskCounter.INPUT_RECORDS_PROCESSED.name());
-                    numInputRecords = records;
+                if (mIGroup != null) {
+                    Long n = 
mIGroup.get(PigStatsUtil.getMultiInputsCounterName(fs.getFileName(), 0));
+                    if (n != null) records = n;
+                }
+                if (records == -1) {
+                    Map<String, Long> taskCounters = 
counters.get(TASK_COUNTER_GROUP);
+                    if (taskCounters != null
+                            && 
taskCounters.get(TaskCounter.INPUT_RECORDS_PROCESSED.name()) != null) {
+                        records = 
taskCounters.get(TaskCounter.INPUT_RECORDS_PROCESSED.name());
+                    }
+                }
+                if (isSuccessful() && records == -1) {
+                    // Tez removes 0 value counters for efficiency.
+                    records = 0;
                 }
+                numInputRecords = records;
                 if (counters.get(FS_COUNTER_GROUP) != null &&
                         
counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_READ) != null) {
                     hdfsBytesRead = 
counters.get(FS_COUNTER_GROUP).get(PigStatsUtil.HDFS_BYTES_READ);
@@ -246,6 +261,11 @@ public class TezVertexStats extends JobS
             return;
         }
 
+        Map<String, Long> msGroup = 
counters.get(PigStatsUtil.MULTI_STORE_COUNTER_GROUP);
+        if (msGroup != null) {
+            multiStoreCounters.putAll(msGroup);
+        }
+
         for (POStore sto : stores) {
             if (sto.isTmpStore()) {
                 continue;
@@ -254,16 +274,20 @@ public class TezVertexStats extends JobS
             long hdfsBytesWritten = -1;
             String filename = sto.getSFile().getFileName();
             if (counters != null) {
-                if (sto.isMultiStore()) {
-                    Map<String, Long> msGroup = 
counters.get(PigStatsUtil.MULTI_STORE_COUNTER_GROUP);
-                    if (msGroup != null) {
-                        multiStoreCounters.putAll(msGroup);
-                        Long n = 
msGroup.get(PigStatsUtil.getMultiStoreCounterName(sto));
-                        if (n != null) records = n;
+                if (msGroup != null) {
+                    Long n = 
msGroup.get(PigStatsUtil.getMultiStoreCounterName(sto));
+                    if (n != null) records = n;
+                }
+                if (records == -1) {
+                    Map<String, Long> taskCounters = 
counters.get(TASK_COUNTER_GROUP);
+                    if (taskCounters != null
+                            && 
taskCounters.get(TaskCounter.OUTPUT_RECORDS.name()) != null) {
+                        records = 
taskCounters.get(TaskCounter.OUTPUT_RECORDS.name());
                     }
-                } else if (counters.get(TASK_COUNTER_GROUP) != null
-                        && 
counters.get(TASK_COUNTER_GROUP).get(TaskCounter.OUTPUT_RECORDS.name()) != 
null) {
-                    records = 
counters.get(TASK_COUNTER_GROUP).get(TaskCounter.OUTPUT_RECORDS.name());
+                }
+                if (isSuccessful() && records == -1) {
+                    // Tez removes 0 value counters for efficiency.
+                    records = 0;
                 }
                 if (records != -1) {
                     numOutputRecords += records;
@@ -385,7 +409,7 @@ public class TezVertexStats extends JobS
     @Override
     @Deprecated
     public Map<String, Long> getMultiStoreCounters() {
-        return multiStoreCounters;
+        return Collections.unmodifiableMap(multiStoreCounters);
     }
 
     @Override

Modified: pig/trunk/test/org/apache/pig/test/TestPigRunner.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigRunner.java?rev=1723424&r1=1723423&r2=1723424&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPigRunner.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPigRunner.java Wed Jan  6 22:25:18 
2016
@@ -26,6 +26,9 @@ import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -35,7 +38,6 @@ import java.util.Properties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.Counters;
-import org.apache.pig.ExecType;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigRunner;
 import org.apache.pig.PigRunner.ReturnCode;
@@ -45,7 +47,6 @@ import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.plan.OperatorPlan;
 import org.apache.pig.newplan.Operator;
-import org.apache.pig.tools.pigstats.EmptyPigStats;
 import org.apache.pig.tools.pigstats.InputStats;
 import org.apache.pig.tools.pigstats.JobStats;
 import org.apache.pig.tools.pigstats.OutputStats;
@@ -394,6 +395,89 @@ public class TestPigRunner {
             Util.deleteFile(cluster, OUTPUT_FILE);
             Util.deleteFile(cluster, OUTPUT_FILE_2);
         }
+    }
+
+    @Test
+    public void simpleMultiQueryTest3() throws Exception {
+        final String INPUT_FILE_2 = "input2";
+        final String OUTPUT_FILE_2 = "output2";
+
+        PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE_2));
+        w.println("3\t4\t5");
+        w.println("5\t6\t7");
+        w.println("3\t7\t8");
+        w.close();
+        Util.copyFromLocalToCluster(cluster, INPUT_FILE_2, INPUT_FILE_2);
+        new File(INPUT_FILE_2).delete();
+
+        w = new PrintWriter(new FileWriter(PIG_FILE));
+        w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, 
a2:int);");
+        w.println("A1 = load '" + INPUT_FILE_2 + "' as (a0:int, a1:int, 
a2:int);");
+        w.println("B = filter A by a0 == 3;");
+        w.println("C = filter A by a1 <=5;");
+        w.println("D = join C by a0, B by a0, A1 by a0 using 'replicated';");
+        w.println("store C into '" + OUTPUT_FILE + "';");
+        w.println("store D into '" + OUTPUT_FILE_2 + "';");
+        w.close();
+
+        try {
+            String[] args = { "-x", execType, PIG_FILE };
+            PigStats stats = PigRunner.run(args, new 
TestNotificationListener(execType));
+            assertTrue(stats.isSuccessful());
+            if (Util.isMapredExecType(cluster.getExecType())) {
+                assertEquals(3, stats.getJobGraph().size());
+            } else {
+                assertEquals(1, stats.getJobGraph().size());
+            }
+
+            // Each output file should include the following:
+            // output:
+            //   1\t2\t3\n
+            //   5\t3\t4\n
+            //   3\t4\t5\n
+            // output2:
+            //   3\t4\t5\t3\t4\t5\t3\t4\t5\n
+            //   3\t4\t5\t3\t4\t5\t3\t7\t8\n
+            //   3\t4\t5\t3\t7\t8\t3\t4\t5\n
+            //   3\t4\t5\t3\t4\t5\t3\t7\t8\n
+            final int numOfRecords1 = 3;
+            final int numOfRecords2 = 4;
+            final int numOfBytesWritten1 = 18;
+            final int numOfBytesWritten2 = 72;
+
+            assertEquals(numOfRecords1 + numOfRecords2, 
stats.getRecordWritten());
+            assertEquals(numOfBytesWritten1 + numOfBytesWritten2, 
stats.getBytesWritten());
+
+            List<String> outputNames = new 
ArrayList<String>(stats.getOutputNames());
+            assertTrue(outputNames.size() == 2);
+            Collections.sort(outputNames);
+            assertEquals(OUTPUT_FILE, outputNames.get(0));
+            assertEquals(OUTPUT_FILE_2, outputNames.get(1));
+            assertEquals(3, stats.getNumberRecords(OUTPUT_FILE));
+            assertEquals(4, stats.getNumberRecords(OUTPUT_FILE_2));
+
+            List<InputStats> inputStats = new 
ArrayList<InputStats>(stats.getInputStats());
+            assertTrue(inputStats.size() == 2);
+            Collections.sort(inputStats, new Comparator<InputStats>() {
+                @Override
+                public int compare(InputStats o1, InputStats o2) {
+                    return o1.getLocation().compareTo(o2.getLocation());
+                }
+            });
+            assertEquals(5, inputStats.get(0).getNumberRecords());
+            assertEquals(3, inputStats.get(1).getNumberRecords());
+            // For mapreduce, since hdfs bytes read includes replicated tables 
bytes read is wrong
+            // Since Tez does has only one load per job its values are correct
+            if (!Util.isMapredExecType(cluster.getExecType())) {
+                assertEquals(30, inputStats.get(0).getBytes());
+                assertEquals(18, inputStats.get(1).getBytes());
+            }
+        } finally {
+            new File(PIG_FILE).delete();
+            Util.deleteFile(cluster, INPUT_FILE_2);
+            Util.deleteFile(cluster, OUTPUT_FILE);
+            Util.deleteFile(cluster, OUTPUT_FILE_2);
+        }
     }
 
     @Test


Reply via email to