Author: rohini Date: Mon Jun 5 20:56:21 2017 New Revision: 1797708 URL: http://svn.apache.org/viewvc?rev=1797708&view=rev Log: PIG-4700: Enable progress reporting for Tasks in Tez (satishsaley via rohini)
Modified: pig/trunk/CHANGES.txt pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1797708&r1=1797707&r2=1797708&view=diff ============================================================================== --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Mon Jun 5 20:56:21 2017 @@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES IMPROVEMENTS +PIG-4700: Enable progress reporting for Tasks in Tez (satishsaley via rohini) + PIG-5251: Bump joda-time to 2.9.9 (dbist13 via rohini) OPTIMIZATIONS Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java?rev=1797708&r1=1797707&r2=1797708&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java Mon Jun 5 20:56:21 2017 @@ -18,6 +18,9 @@ package org.apache.pig.backend.hadoop.executionengine.tez.runtime; import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashSet; @@ -65,6 +68,7 @@ import org.apache.pig.impl.util.UDFConte import org.apache.pig.impl.util.Utils; import org.apache.pig.tools.pigstats.PigStatusReporter; import org.apache.tez.common.TezUtils; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.UserPayload; import org.apache.tez.mapreduce.hadoop.MRConfig; import org.apache.tez.mapreduce.output.MROutput; @@ -107,6 +111,7 @@ public class PigProcessor extends Abstra private Configuration conf; private PigHadoopLogger pigHadoopLogger; + private Object progressHelper; public static String sampleVertex; public static Map<String, Object> sampleMap; @@ -203,6 +208,21 @@ public class PigProcessor extends Abstra @Override public void close() throws Exception { + /* + * if (progressHelper != null) { + * progressHelper.shutDownProgressTaskService(); } + */ + try { + if (progressHelper != null) { + Class<?> clazz = Class.forName("org.apache.tez.common.ProgressHelper"); + Method shutDownProgressTaskService = clazz.getMethod("shutDownProgressTaskService"); + shutDownProgressTaskService.invoke(progressHelper); + } + } + catch (ClassNotFoundException | NoSuchMethodException | SecurityException | IllegalAccessException + | IllegalArgumentException | InvocationTargetException e) { + // ignore + } execPlan = null; fileOutputs = null; leaf = null; @@ -221,6 +241,26 @@ public class PigProcessor extends Abstra @Override public void run(Map<String, LogicalInput> inputs, Map<String, LogicalOutput> outputs) throws Exception { + /* + * progressHelper = new ProgressHelper(inputs, getContext(), + * this.getClass().getSimpleName()); + * progressHelper.scheduleProgressTaskService(100, Math.max(1000, + * conf.getInt(TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS, + * TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT) - 50)); + */ + try { + Class<?> clazz = Class.forName("org.apache.tez.common.ProgressHelper"); + Constructor<?> ctor = clazz.getConstructor(Map.class, ProcessorContext.class, String.class); + progressHelper = ctor.newInstance(inputs, getContext(), this.getClass().getSimpleName()); + Method scheduleProgressTaskService = clazz.getMethod("scheduleProgressTaskService", long.class, long.class); + scheduleProgressTaskService.invoke(progressHelper, 100, + Math.max(1000, conf.getInt(TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS, + TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT) - 50)); + } + catch (IllegalAccessException | IllegalArgumentException | InstantiationException | InvocationTargetException + | ClassNotFoundException | NoSuchMethodException | SecurityException e) { + // ignore + } try { initializeInputs(inputs); Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java?rev=1797708&r1=1797707&r2=1797708&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java Mon Jun 5 20:56:21 2017 @@ -95,8 +95,6 @@ public class MRToTezHelper { mrMapParamToTezVertexParamMap.put(MRJobConfig.MAP_LOG_LEVEL, TezConfiguration.TEZ_TASK_LOG_LEVEL); // TezConfiguration.TEZ_AM_VERTEX_MAX_TASK_CONCURRENCY TEZ-2914 in Tez 0.8 mrMapParamToTezVertexParamMap.put("mapreduce.job.running.map.limit", "tez.am.vertex.max-task-concurrency"); - // TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS TEZ-808 in Tez 0.8 - mrMapParamToTezVertexParamMap.put(MRJobConfig.TASK_TIMEOUT, "tez.am.progress.stuck.interval-ms"); // TezConfiguration.TEZ_VERTEX_FAILURES_MAXPERCENT TEZ-3271 in Tez 0.8.4 mrMapParamToTezVertexParamMap.put(MRJobConfig.MAP_FAILURES_MAX_PERCENT, "tez.vertex.failures.maxpercent"); @@ -105,8 +103,17 @@ public class MRToTezHelper { mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_SPECULATIVE, TezConfiguration.TEZ_AM_SPECULATION_ENABLED); mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_LOG_LEVEL, TezConfiguration.TEZ_TASK_LOG_LEVEL); mrReduceParamToTezVertexParamMap.put("mapreduce.job.running.reduce.limit", "tez.am.vertex.max-task-concurrency"); - mrReduceParamToTezVertexParamMap.put(MRJobConfig.TASK_TIMEOUT, "tez.am.progress.stuck.interval-ms"); mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, "tez.vertex.failures.maxpercent"); + + try { + Class.forName("org.apache.tez.common.ProgressHelper"); + // TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS TEZ-808 in Tez 0.8 + mrMapParamToTezVertexParamMap.put(MRJobConfig.TASK_TIMEOUT, "tez.task.progress.stuck.interval-ms"); + mrReduceParamToTezVertexParamMap.put(MRJobConfig.TASK_TIMEOUT, "tez.task.progress.stuck.interval-ms"); + } + catch (ClassNotFoundException e) { + // Not translating before Tez 0.8.5 due to TEZ-3549 + } } private static void populateMRSettingsToRetain() {