Author: rohini
Date: Mon Jun 5 20:56:21 2017
New Revision: 1797708
URL: http://svn.apache.org/viewvc?rev=1797708&view=rev
Log:
PIG-4700: Enable progress reporting for Tasks in Tez (satishsaley via rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1797708&r1=1797707&r2=1797708&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Jun 5 20:56:21 2017
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-4700: Enable progress reporting for Tasks in Tez (satishsaley via rohini)
+
PIG-5251: Bump joda-time to 2.9.9 (dbist13 via rohini)
OPTIMIZATIONS
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java?rev=1797708&r1=1797707&r2=1797708&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
Mon Jun 5 20:56:21 2017
@@ -18,6 +18,9 @@
package org.apache.pig.backend.hadoop.executionengine.tez.runtime;
import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
@@ -65,6 +68,7 @@ import org.apache.pig.impl.util.UDFConte
import org.apache.pig.impl.util.Utils;
import org.apache.pig.tools.pigstats.PigStatusReporter;
import org.apache.tez.common.TezUtils;
+import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.mapreduce.hadoop.MRConfig;
import org.apache.tez.mapreduce.output.MROutput;
@@ -107,6 +111,7 @@ public class PigProcessor extends Abstra
private Configuration conf;
private PigHadoopLogger pigHadoopLogger;
+ private Object progressHelper;
public static String sampleVertex;
public static Map<String, Object> sampleMap;
@@ -203,6 +208,21 @@ public class PigProcessor extends Abstra
@Override
public void close() throws Exception {
+ /*
+ * if (progressHelper != null) {
+ * progressHelper.shutDownProgressTaskService(); }
+ */
+ try {
+ if (progressHelper != null) {
+ Class<?> clazz =
Class.forName("org.apache.tez.common.ProgressHelper");
+ Method shutDownProgressTaskService =
clazz.getMethod("shutDownProgressTaskService");
+ shutDownProgressTaskService.invoke(progressHelper);
+ }
+ }
+ catch (ClassNotFoundException | NoSuchMethodException |
SecurityException | IllegalAccessException
+ | IllegalArgumentException | InvocationTargetException e) {
+ // ignore
+ }
execPlan = null;
fileOutputs = null;
leaf = null;
@@ -221,6 +241,26 @@ public class PigProcessor extends Abstra
@Override
public void run(Map<String, LogicalInput> inputs,
Map<String, LogicalOutput> outputs) throws Exception {
+ /*
+ * progressHelper = new ProgressHelper(inputs, getContext(),
+ * this.getClass().getSimpleName());
+ * progressHelper.scheduleProgressTaskService(100, Math.max(1000,
+ * conf.getInt(TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS,
+ * TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT) - 50));
+ */
+ try {
+ Class<?> clazz =
Class.forName("org.apache.tez.common.ProgressHelper");
+ Constructor<?> ctor = clazz.getConstructor(Map.class,
ProcessorContext.class, String.class);
+ progressHelper = ctor.newInstance(inputs, getContext(),
this.getClass().getSimpleName());
+ Method scheduleProgressTaskService =
clazz.getMethod("scheduleProgressTaskService", long.class, long.class);
+ scheduleProgressTaskService.invoke(progressHelper, 100,
+ Math.max(1000,
conf.getInt(TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS,
+
TezConfiguration.TEZ_TASK_AM_HEARTBEAT_INTERVAL_MS_DEFAULT) - 50));
+ }
+ catch (IllegalAccessException | IllegalArgumentException |
InstantiationException | InvocationTargetException
+ | ClassNotFoundException | NoSuchMethodException |
SecurityException e) {
+ // ignore
+ }
try {
initializeInputs(inputs);
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java?rev=1797708&r1=1797707&r2=1797708&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
Mon Jun 5 20:56:21 2017
@@ -95,8 +95,6 @@ public class MRToTezHelper {
mrMapParamToTezVertexParamMap.put(MRJobConfig.MAP_LOG_LEVEL,
TezConfiguration.TEZ_TASK_LOG_LEVEL);
// TezConfiguration.TEZ_AM_VERTEX_MAX_TASK_CONCURRENCY TEZ-2914 in Tez
0.8
mrMapParamToTezVertexParamMap.put("mapreduce.job.running.map.limit",
"tez.am.vertex.max-task-concurrency");
- // TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS TEZ-808 in Tez
0.8
- mrMapParamToTezVertexParamMap.put(MRJobConfig.TASK_TIMEOUT,
"tez.am.progress.stuck.interval-ms");
// TezConfiguration.TEZ_VERTEX_FAILURES_MAXPERCENT TEZ-3271 in Tez
0.8.4
mrMapParamToTezVertexParamMap.put(MRJobConfig.MAP_FAILURES_MAX_PERCENT,
"tez.vertex.failures.maxpercent");
@@ -105,8 +103,17 @@ public class MRToTezHelper {
mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_SPECULATIVE,
TezConfiguration.TEZ_AM_SPECULATION_ENABLED);
mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_LOG_LEVEL,
TezConfiguration.TEZ_TASK_LOG_LEVEL);
mrReduceParamToTezVertexParamMap.put("mapreduce.job.running.reduce.limit",
"tez.am.vertex.max-task-concurrency");
- mrReduceParamToTezVertexParamMap.put(MRJobConfig.TASK_TIMEOUT,
"tez.am.progress.stuck.interval-ms");
mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_FAILURES_MAXPERCENT,
"tez.vertex.failures.maxpercent");
+
+ try {
+ Class.forName("org.apache.tez.common.ProgressHelper");
+ // TezConfiguration.TEZ_TASK_PROGRESS_STUCK_INTERVAL_MS TEZ-808 in
Tez 0.8
+ mrMapParamToTezVertexParamMap.put(MRJobConfig.TASK_TIMEOUT,
"tez.task.progress.stuck.interval-ms");
+ mrReduceParamToTezVertexParamMap.put(MRJobConfig.TASK_TIMEOUT,
"tez.task.progress.stuck.interval-ms");
+ }
+ catch (ClassNotFoundException e) {
+ // Not translating before Tez 0.8.5 due to TEZ-3549
+ }
}
private static void populateMRSettingsToRetain() {