Updated Branches: refs/heads/TEZ-1 e3c8f2081 -> b102eb1c6
TEZ-104. Fix build after YARN API changes introduced by YARN-142 sub-tasks. Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/b102eb1c Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/b102eb1c Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/b102eb1c Branch: refs/heads/TEZ-1 Commit: b102eb1c695f13b91aba7cf9eae280984313e3ef Parents: e3c8f20 Author: Siddharth Seth <[email protected]> Authored: Tue May 7 11:09:23 2013 -0700 Committer: Siddharth Seth <[email protected]> Committed: Tue May 7 11:09:23 2013 -0700 ---------------------------------------------------------------------- .../tez/mapreduce/ClientServiceDelegate.java | 8 +- .../apache/tez/mapreduce/ResourceMgrDelegate.java | 70 ++++++++-- .../java/org/apache/tez/mapreduce/YARNRunner.java | 103 ++++++++------- 3 files changed, 120 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b102eb1c/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java ---------------------------------------------------------------------- diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java index 2e840fb..9cdc845 100644 --- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java +++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ClientServiceDelegate.java @@ -76,8 +76,12 @@ public class ClientServiceDelegate { public JobStatus getJobStatus(JobID oldJobID) throws IOException { org.apache.hadoop.mapreduce.v2.api.records.JobId jobId = TypeConverter.toYarn(oldJobID); - ApplicationReport appReport = - rm.getApplicationReport(jobId.getAppId()); + ApplicationReport appReport; + try { + appReport = rm.getApplicationReport(jobId.getAppId()); + } catch (YarnRemoteException e) { + throw new IOException(e); + } JobStatus jobStatus = new DAGJobStatus(appReport); return jobStatus; http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b102eb1c/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ResourceMgrDelegate.java ---------------------------------------------------------------------- diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ResourceMgrDelegate.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ResourceMgrDelegate.java index 768dc53..2561b56 100644 --- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ResourceMgrDelegate.java +++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/ResourceMgrDelegate.java @@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.client.YarnClientImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.util.ProtoUtils; import org.apache.tez.mapreduce.hadoop.MRJobConfig; @@ -64,11 +65,19 @@ public class ResourceMgrDelegate extends YarnClientImpl { public TaskTrackerInfo[] getActiveTrackers() throws IOException, InterruptedException { - return TypeConverter.fromYarnNodes(super.getNodeReports()); + try { + return TypeConverter.fromYarnNodes(super.getNodeReports()); + } catch (YarnRemoteException e) { + throw new IOException(e); + } } public JobStatus[] getAllJobs() throws IOException, InterruptedException { - return TypeConverter.fromYarnApps(super.getApplicationList(), this.conf); + try { + return TypeConverter.fromYarnApps(super.getApplicationList(), this.conf); + } catch (YarnRemoteException e) { + throw new IOException(e); + } } public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException, @@ -80,7 +89,12 @@ public class ResourceMgrDelegate extends YarnClientImpl { public ClusterMetrics getClusterMetrics() throws IOException, InterruptedException { - YarnClusterMetrics metrics = super.getYarnClusterMetrics(); + YarnClusterMetrics metrics; + try { + metrics = super.getYarnClusterMetrics(); + } catch (YarnRemoteException e) { + throw new IOException(e); + } ClusterMetrics oldMetrics = new ClusterMetrics(1, 1, 1, 1, 1, 1, metrics.getNumNodeManagers() * 10, metrics.getNumNodeManagers() * 2, 1, metrics.getNumNodeManagers(), 0, 0); @@ -90,8 +104,12 @@ public class ResourceMgrDelegate extends YarnClientImpl { @SuppressWarnings("rawtypes") public Token getDelegationToken(Text renewer) throws IOException, InterruptedException { - return ProtoUtils.convertFromProtoFormat( - super.getRMDelegationToken(renewer), rmAddress); + try { + return ProtoUtils.convertFromProtoFormat( + super.getRMDelegationToken(renewer), rmAddress); + } catch (YarnRemoteException e) { + throw new IOException(e); + } } public String getFilesystemName() throws IOException, InterruptedException { @@ -99,35 +117,59 @@ public class ResourceMgrDelegate extends YarnClientImpl { } public JobID getNewJobID() throws IOException, InterruptedException { - this.application = super.getNewApplication(); + try { + this.application = super.getNewApplication(); + } catch (YarnRemoteException e) { + throw new IOException(e); + } this.applicationId = this.application.getApplicationId(); return TypeConverter.fromYarn(applicationId); } public QueueInfo getQueue(String queueName) throws IOException, InterruptedException { - return TypeConverter.fromYarn( - super.getQueueInfo(queueName), this.conf); + try { + return TypeConverter.fromYarn( + super.getQueueInfo(queueName), this.conf); + } catch (YarnRemoteException e) { + throw new IOException(e); + } } public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException, InterruptedException { - return TypeConverter.fromYarnQueueUserAclsInfo(super - .getQueueAclsInfo()); + try { + return TypeConverter.fromYarnQueueUserAclsInfo(super + .getQueueAclsInfo()); + } catch (YarnRemoteException e) { + throw new IOException(e); + } } public QueueInfo[] getQueues() throws IOException, InterruptedException { - return TypeConverter.fromYarnQueueInfo(super.getAllQueues(), this.conf); + try { + return TypeConverter.fromYarnQueueInfo(super.getAllQueues(), this.conf); + } catch (YarnRemoteException e) { + throw new IOException(e); + } } public QueueInfo[] getRootQueues() throws IOException, InterruptedException { - return TypeConverter.fromYarnQueueInfo(super.getRootQueueInfos(), this.conf); + try { + return TypeConverter.fromYarnQueueInfo(super.getRootQueueInfos(), this.conf); + } catch (YarnRemoteException e) { + throw new IOException(e); + } } public QueueInfo[] getChildQueues(String parent) throws IOException, InterruptedException { - return TypeConverter.fromYarnQueueInfo(super.getChildQueueInfos(parent), - this.conf); + try { + return TypeConverter.fromYarnQueueInfo(super.getChildQueueInfos(parent), + this.conf); + } catch (YarnRemoteException e) { + throw new IOException(e); + } } public String getStagingAreaDir() throws IOException, InterruptedException { http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b102eb1c/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java ---------------------------------------------------------------------- diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java index 35aa0bb..715a364 100644 --- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java +++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java @@ -92,6 +92,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.util.Apps; @@ -887,51 +888,51 @@ public class YARNRunner implements ClientProtocol { jobLocalResources); // Submit to ResourceManager - ApplicationId applicationId = resMgrDelegate.submitApplication(appContext); - - ApplicationReport appMasterReport = resMgrDelegate - .getApplicationReport(applicationId); - String diagnostics = - (appMasterReport == null ? - "application report is null" : appMasterReport.getDiagnostics()); - if (appMasterReport == null - || appMasterReport.getYarnApplicationState() == YarnApplicationState.FAILED - || appMasterReport.getYarnApplicationState() == YarnApplicationState.KILLED) { - throw new IOException("Failed to run job : " + - diagnostics); - } + try { + ApplicationId applicationId = resMgrDelegate + .submitApplication(appContext); + + ApplicationReport appMasterReport = resMgrDelegate + .getApplicationReport(applicationId); + String diagnostics = (appMasterReport == null ? "application report is null" + : appMasterReport.getDiagnostics()); + if (appMasterReport == null + || appMasterReport.getYarnApplicationState() == YarnApplicationState.FAILED + || appMasterReport.getYarnApplicationState() == YarnApplicationState.KILLED) { + throw new IOException("Failed to run job : " + diagnostics); + } - if (LOG.isDebugEnabled()) { - while (true) { - appMasterReport = resMgrDelegate - .getApplicationReport(applicationId); - diagnostics = - (appMasterReport == null ? - "application report is null" - : appMasterReport.getDiagnostics()); - if (appMasterReport == null) { - throw new IOException("Failed to run job : " + - diagnostics); - } - YarnApplicationState state = appMasterReport.getYarnApplicationState(); - if (state.equals(YarnApplicationState.FAILED) - || state.equals(YarnApplicationState.FINISHED) - || state.equals(YarnApplicationState.KILLED)) { - LOG.info("Job completed" - + ", finalStatus=" + appMasterReport.getFinalApplicationStatus() - + ", finalState=" + appMasterReport.getYarnApplicationState() - + ", diagnostics=" + diagnostics); - break; - } else { - LOG.info("Job in progress" - + ", finalStatus=" + appMasterReport.getFinalApplicationStatus() - + ", finalState=" + appMasterReport.getYarnApplicationState()); - } - try { - Thread.sleep(1000); - } catch (InterruptedException e) { + if (LOG.isDebugEnabled()) { + while (true) { + appMasterReport = resMgrDelegate.getApplicationReport(applicationId); + diagnostics = (appMasterReport == null ? "application report is null" + : appMasterReport.getDiagnostics()); + if (appMasterReport == null) { + throw new IOException("Failed to run job : " + diagnostics); + } + YarnApplicationState state = appMasterReport + .getYarnApplicationState(); + if (state.equals(YarnApplicationState.FAILED) + || state.equals(YarnApplicationState.FINISHED) + || state.equals(YarnApplicationState.KILLED)) { + LOG.info("Job completed" + ", finalStatus=" + + appMasterReport.getFinalApplicationStatus() + ", finalState=" + + appMasterReport.getYarnApplicationState() + ", diagnostics=" + + diagnostics); + break; + } else { + LOG.info("Job in progress" + ", finalStatus=" + + appMasterReport.getFinalApplicationStatus() + ", finalState=" + + appMasterReport.getYarnApplicationState()); + } + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } } } + } catch (YarnRemoteException e) { + throw new IOException(e); } // FIXME @@ -1011,7 +1012,11 @@ public class YARNRunner implements ClientProtocol { /* check if the status is not running, if not send kill to RM */ JobStatus status = clientCache.getClient(arg0).getJobStatus(arg0); if (status.getState() != JobStatus.State.RUNNING) { - resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId()); + try { + resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId()); + } catch (YarnRemoteException e) { + throw new IOException(e); + } return; } @@ -1035,7 +1040,11 @@ public class YARNRunner implements ClientProtocol { LOG.debug("Error when checking for application status", io); } if (status.getState() != JobStatus.State.KILLED) { - resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId()); + try { + resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId()); + } catch (YarnRemoteException e) { + throw new IOException(e); + } } } @@ -1066,7 +1075,11 @@ public class YARNRunner implements ClientProtocol { @Override public LogParams getLogFileParams(JobID jobID, TaskAttemptID taskAttemptID) throws IOException { - return clientCache.getClient(jobID).getLogFilePath(jobID, taskAttemptID); + try { + return clientCache.getClient(jobID).getLogFilePath(jobID, taskAttemptID); + } catch (YarnRemoteException e) { + throw new IOException(e); + } } private static void warnForJavaLibPath(String opts, String component,
