Author: rohini
Date: Fri Jan 23 05:44:42 2015
New Revision: 1654121
URL: http://svn.apache.org/r1654121
Log:
PIG-4387: Honor yarn settings in tez-site.xml and optimize dag status fetch
(rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezExecutionEngine.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1654121&r1=1654120&r2=1654121&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Jan 23 05:44:42 2015
@@ -44,6 +44,8 @@ PIG-4333: Split BigData tests into multi
BUG FIXES
+PIG-4387: Honor yarn settings in tez-site.xml and optimize dag status fetch
(rohini)
+
PIG-4352: Port local mode tests to Tez - TestUnionOnSchema (daijy)
PIG-4359: Port local mode tests to Tez - part4 (daijy)
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezExecutionEngine.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezExecutionEngine.java?rev=1654121&r1=1654120&r2=1654121&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezExecutionEngine.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezExecutionEngine.java
Fri Jan 23 05:44:42 2015
@@ -18,14 +18,18 @@
package org.apache.pig.backend.hadoop.executionengine.tez;
+import java.util.Properties;
import java.util.UUID;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
import org.apache.pig.impl.PigContext;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.ScriptState;
import org.apache.pig.tools.pigstats.tez.TezPigScriptStats;
import org.apache.pig.tools.pigstats.tez.TezScriptState;
+import org.apache.tez.dag.api.TezConfiguration;
public class TezExecutionEngine extends HExecutionEngine {
@@ -45,4 +49,11 @@ public class TezExecutionEngine extends
public PigStats instantiatePigStats() {
return new TezPigScriptStats(pigContext);
}
+
+ @Override
+ public JobConf getExecConf(Properties properties) throws ExecException {
+ JobConf jc = super.getExecConf(properties);
+ jc.addResource(TezConfiguration.TEZ_SITE_XML);
+ return jc;
+ }
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java?rev=1654121&r1=1654120&r2=1654121&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
(original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
Fri Jan 23 05:44:42 2015
@@ -19,7 +19,6 @@ package org.apache.pig.backend.hadoop.ex
import java.io.IOException;
import java.util.EnumSet;
-import java.util.HashSet;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
@@ -177,13 +176,18 @@ public class TezJob implements Runnable
while (true) {
try {
- dagStatus = dagClient.getDAGStatus(statusGetOpts);
+ dagStatus = dagClient.getDAGStatus(null);
} catch (Exception e) {
log.info("Cannot retrieve DAG status", e);
break;
}
if (dagStatus.isCompleted()) {
+ try {
+ dagStatus = dagClient.getDAGStatus(statusGetOpts);
+ } catch (Exception e) {
+ log.warn("Failed to retrieve DAG counters", e);
+ }
// For tez_local mode where PigProcessor destroys all
UDFContext
UDFContext.setUdfContext(udfContext);
@@ -219,16 +223,10 @@ public class TezJob implements Runnable
private class DAGStatusReporter extends TimerTask {
- private final String LINE_SEPARATOR =
System.getProperty("line.separator");
-
@Override
public void run() {
if (dagStatus == null) return;
- String msg = "status=" + dagStatus.getState()
- + ", progress=" + dagStatus.getDAGProgress()
- + ", diagnostics="
- + StringUtils.join(dagStatus.getDiagnostics(), LINE_SEPARATOR);
- log.info("DAG Status: " + msg);
+ log.info("DAG Status: " + dagStatus.toString());
}
}
@@ -248,7 +246,7 @@ public class TezJob implements Runnable
public String getDiagnostics() {
try {
if (dagClient != null && dagStatus == null) {
- dagStatus = dagClient.getDAGStatus(new
HashSet<StatusGetOpts>());
+ dagStatus = dagClient.getDAGStatus(null);
}
if (dagStatus != null) {
return StringUtils.join(dagStatus.getDiagnostics(), "\n");