Author: rohini
Date: Mon Jun  5 20:56:21 2017
New Revision: 1797708

URL: http://svn.apache.org/viewvc?rev=1797708&view=rev
Log:
PIG-4700: Enable progress reporting for Tasks in Tez (satishsaley via rohini)

Modified:
    pig/trunk/CHANGES.txt
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
    
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1797708&r1=1797707&r2=1797708&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Jun  5 20:56:21 2017
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
  
 IMPROVEMENTS
 
+PIG-4700: Enable progress reporting for Tasks in Tez (satishsaley via rohini)
+
 PIG-5251: Bump joda-time to 2.9.9 (dbist13 via rohini)
  
 OPTIMIZATIONS

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java?rev=1797708&r1=1797707&r2=1797708&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
 Mon Jun  5 20:56:21 2017
@@ -18,6 +18,9 @@
 package org.apache.pig.backend.hadoop.executionengine.tez.runtime;
 
 import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -65,6 +68,7 @@ import org.apache.pig.impl.util.UDFConte
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.tools.pigstats.PigStatusReporter;
 import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.UserPayload;
 import org.apache.tez.mapreduce.hadoop.MRConfig;
 import org.apache.tez.mapreduce.output.MROutput;
@@ -107,6 +111,7 @@ public class PigProcessor extends Abstra
 
     private Configuration conf;
     private PigHadoopLogger pigHadoopLogger;
+    private Object progressHelper;
 
     public static String sampleVertex;
     public static Map<String, Object> sampleMap;
@@ -203,6 +208,21 @@ public class PigProcessor extends Abstra
 
     @Override
     public void close() throws Exception {
+        /*
+         * if (progressHelper != null) {
+         * progressHelper.shutDownProgressTaskService(); }
+         */
+        try {
+            if (progressHelper != null) {
+                Class<?> clazz = 
Class.forName("org.apache.tez.common.ProgressHelper");
+                Method shutDownProgressTaskService = 
clazz.getMethod("shutDownProgressTaskService");
+                shutDownProgressTaskService.invoke(progressHelper);
+            }
+        }
+        catch (ClassNotFoundException | NoSuchMethodException | 
SecurityException | IllegalAccessException
+                | IllegalArgumentException | InvocationTargetException e) {
+            // ignore
+        }
         execPlan = null;
         fileOutputs = null;
         leaf = null;
@@ -221,6 +241,26 @@ public class PigProcessor extends Abstra
     @Override
     public void run(Map<String, LogicalInput> inputs,
             Map<String, LogicalOutput> outputs) throws Exception {
+        /*
+         * progressHelper = new ProgressHelper(inputs, getContext(),
+         * this.getClass().getSimpleName());
+         * progressHelper.scheduleProgressTaskService(100, Math.max(1000,
+         * conf.getInt(TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS,
+         * TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT) - 50));
+         */
+        try {
+            Class<?> clazz = 
Class.forName("org.apache.tez.common.ProgressHelper");
+            Constructor<?> ctor = clazz.getConstructor(Map.class, 
ProcessorContext.class, String.class);
+            progressHelper = ctor.newInstance(inputs, getContext(), 
this.getClass().getSimpleName());
+            Method scheduleProgressTaskService = 
clazz.getMethod("scheduleProgressTaskService", long.class, long.class);
+            scheduleProgressTaskService.invoke(progressHelper, 100,
+                    Math.max(1000, 
conf.getInt(TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS,
+                            
TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT) - 50));
+        }
+        catch (IllegalAccessException | IllegalArgumentException | 
InstantiationException | InvocationTargetException
+                | ClassNotFoundException | NoSuchMethodException | 
SecurityException e) {
+            // ignore
+        }
 
         try {
             initializeInputs(inputs);

Modified: 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java?rev=1797708&r1=1797707&r2=1797708&view=diff
==============================================================================
--- 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
 (original)
+++ 
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
 Mon Jun  5 20:56:21 2017
@@ -95,8 +95,6 @@ public class MRToTezHelper {
         mrMapParamToTezVertexParamMap.put(MRJobConfig.MAP_LOG_LEVEL, 
TezConfiguration.TEZ_TASK_LOG_LEVEL);
         // TezConfiguration.TEZ_AM_VERTEX_MAX_TASK_CONCURRENCY TEZ-2914 in Tez 
0.8
         mrMapParamToTezVertexParamMap.put("mapreduce.job.running.map.limit", 
"tez.am.vertex.max-task-concurrency");
-        // TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS TEZ-808 in Tez 
0.8
-        mrMapParamToTezVertexParamMap.put(MRJobConfig.TASK_TIMEOUT, 
"tez.am.progress.stuck.interval-ms");
         // TezConfiguration.TEZ_VERTEX_FAILURES_MAXPERCENT TEZ-3271 in Tez 
0.8.4
         
mrMapParamToTezVertexParamMap.put(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 
"tez.vertex.failures.maxpercent");
 
@@ -105,8 +103,17 @@ public class MRToTezHelper {
         mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_SPECULATIVE, 
TezConfiguration.TEZ_AM_SPECULATION_ENABLED);
         mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_LOG_LEVEL, 
TezConfiguration.TEZ_TASK_LOG_LEVEL);
         
mrReduceParamToTezVertexParamMap.put("mapreduce.job.running.reduce.limit", 
"tez.am.vertex.max-task-concurrency");
-        mrReduceParamToTezVertexParamMap.put(MRJobConfig.TASK_TIMEOUT, 
"tez.am.progress.stuck.interval-ms");
         
mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 
"tez.vertex.failures.maxpercent");
+
+        try {
+            Class.forName("org.apache.tez.common.ProgressHelper");
+            // TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS TEZ-808 in 
Tez 0.8
+            mrMapParamToTezVertexParamMap.put(MRJobConfig.TASK_TIMEOUT, 
"tez.task.progress.stuck.interval-ms");
+            mrReduceParamToTezVertexParamMap.put(MRJobConfig.TASK_TIMEOUT, 
"tez.task.progress.stuck.interval-ms");
+        }
+        catch (ClassNotFoundException e) {
+            // Not translating before Tez 0.8.5 due to TEZ-3549
+        }
     }
 
     private static void populateMRSettingsToRetain() {


Reply via email to