Author: cdouglas
Date: Fri Jun 3 23:30:53 2011
New Revision: 1131286
URL: http://svn.apache.org/viewvc?rev=1131286&view=rev
Log:
MAPREDUCE-2535. Fix NPE in JobClient caused by retirement.
Contributed by Robert Joseph Evans
Added:
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestNetworkedJob.java
Modified:
hadoop/common/branches/branch-0.20-security/CHANGES.txt
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobClient.java
Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1131286&r1=1131285&r2=1131286&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Fri Jun 3 23:30:53
2011
@@ -47,6 +47,9 @@ Release 0.20.205.0 - unreleased
MAPREDUCE-2558. Add JobTracker metrics for scheduling queues. (Jeffrey
Naisbitt via cdouglas)
+ MAPREDUCE-2535. Fix NPE in JobClient caused by retirement. (Robert Joseph
+ Evans via cdouglas)
+
Release 0.20.204.0 - unreleased
NEW FEATURES
Modified:
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=1131286&r1=1131285&r2=1131286&view=diff
==============================================================================
---
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobClient.java
(original)
+++
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobClient.java
Fri Jun 3 23:30:53 2011
@@ -183,7 +183,8 @@ public class JobClient extends Configure
* a JobProfile object to provide some info, and interacts with the
* remote service to provide certain functionality.
*/
- class NetworkedJob implements RunningJob {
+ static class NetworkedJob implements RunningJob {
+ private JobSubmissionProtocol jobSubmitClient;
JobProfile profile;
JobStatus status;
long statustime;
@@ -191,16 +192,26 @@ public class JobClient extends Configure
/**
* We store a JobProfile and a timestamp for when we last
* acquired the job profile. If the job is null, then we cannot
- * perform any of the tasks. The job might be null if the JobTracker
- * has completely forgotten about the job. (eg, 24 hours after the
- * job completes.)
+ * perform any of the tasks, so we throw an exception.
+ * The job might be null if the JobTracker has completely forgotten
+ * about the job. (eg, 24 hours after the job completes.)
*/
- public NetworkedJob(JobStatus job) throws IOException {
+ public NetworkedJob(JobStatus job, JobProfile prof, JobSubmissionProtocol
jobSubmitClient) throws IOException {
this.status = job;
- this.profile = jobSubmitClient.getJobProfile(job.getJobID());
+ this.profile = prof;
+ this.jobSubmitClient = jobSubmitClient;
+ if(this.status == null) {
+ throw new IOException("The Job status cannot be null");
+ }
+ if(this.profile == null) {
+ throw new IOException("The Job profile cannot be null");
+ }
+ if(this.jobSubmitClient == null) {
+ throw new IOException("The Job Submission Protocol cannot be null");
+ }
this.statustime = System.currentTimeMillis();
}
-
+
/**
* Some methods rely on having a recent job profile object. Refresh
* it, if necessary
@@ -217,6 +228,9 @@ public class JobClient extends Configure
*/
synchronized void updateStatus() throws IOException {
this.status = jobSubmitClient.getJobStatus(profile.getJobID());
+ if(this.status == null) {
+ throw new IOException("The job appears to have been removed.");
+ }
this.statustime = System.currentTimeMillis();
}
@@ -863,8 +877,9 @@ public class JobClient extends Configure
printTokens(jobId, jobCopy.getCredentials());
status = jobSubmitClient.submitJob(
jobId, submitJobDir.toString(), jobCopy.getCredentials());
- if (status != null) {
- return new NetworkedJob(status);
+ JobProfile prof = jobSubmitClient.getJobProfile(jobId);
+ if (status != null && prof != null) {
+ return new NetworkedJob(status, prof, jobSubmitClient);
} else {
throw new IOException("Could not launch job");
}
@@ -1011,8 +1026,9 @@ public class JobClient extends Configure
*/
public RunningJob getJob(JobID jobid) throws IOException {
JobStatus status = jobSubmitClient.getJobStatus(jobid);
- if (status != null) {
- return new NetworkedJob(status);
+ JobProfile profile = jobSubmitClient.getJobProfile(jobid);
+ if (status != null && profile != null) {
+ return new NetworkedJob(status, profile, jobSubmitClient);
} else {
return null;
}
Added:
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestNetworkedJob.java
URL:
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestNetworkedJob.java?rev=1131286&view=auto
==============================================================================
---
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestNetworkedJob.java
(added)
+++
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestNetworkedJob.java
Fri Jun 3 23:30:53 2011
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.Test;
+import static org.mockito.Mockito.*;
+
+public class TestNetworkedJob {
+
+ @Test(expected=IOException.class)
+ public void testNullStatus() throws Exception {
+ JobProfile mockProf = mock(JobProfile.class);
+ JobSubmissionProtocol mockClient = mock(JobSubmissionProtocol.class);
+ new JobClient.NetworkedJob(null, mockProf, mockClient);
+ }
+
+ @Test(expected=IOException.class)
+ public void testNullProfile() throws Exception {
+ JobStatus mockStatus = mock(JobStatus.class);
+ JobSubmissionProtocol mockClient = mock(JobSubmissionProtocol.class);
+ new JobClient.NetworkedJob(mockStatus, null, mockClient);
+ }
+
+ @Test(expected=IOException.class)
+ public void testNullClient() throws Exception {
+ JobStatus mockStatus = mock(JobStatus.class);
+ JobProfile mockProf = mock(JobProfile.class);
+ new JobClient.NetworkedJob(mockStatus, mockProf, null);
+ }
+
+
+ @SuppressWarnings("deprecation")
+ @Test
+ public void testBadUpdate() throws Exception {
+ JobStatus mockStatus = mock(JobStatus.class);
+ JobProfile mockProf = mock(JobProfile.class);
+ JobSubmissionProtocol mockClient = mock(JobSubmissionProtocol.class);
+
+ JobID id = new JobID("test",0);
+
+ RunningJob rj = new JobClient.NetworkedJob(mockStatus, mockProf,
mockClient);
+
+ when(mockProf.getJobID()).thenReturn(id);
+ when(mockClient.getJobStatus(id)).thenReturn(null);
+
+ boolean caught = false;
+ try {
+ rj.isSuccessful();
+ } catch(IOException e) {
+ caught = true;
+ }
+ assertTrue("Expected updateStatus to throw an IOException bt it did not",
caught);
+
+ //verification
+ verify(mockProf).getJobID();
+ verify(mockClient).getJobStatus(id);
+ }
+
+ @SuppressWarnings("deprecation")
+ @Test
+ public void testGetNullCounters() throws Exception {
+ JobStatus mockStatus = mock(JobStatus.class);
+ JobProfile mockProf = mock(JobProfile.class);
+ JobSubmissionProtocol mockClient = mock(JobSubmissionProtocol.class);
+ RunningJob underTest =
+ new JobClient.NetworkedJob(mockStatus, mockProf, mockClient);
+
+ JobID id = new JobID("test", 0);
+ when(mockProf.getJobID()).thenReturn(id);
+ when(mockClient.getJobCounters(id)).thenReturn(null);
+ assertNull(underTest.getCounters());
+ //verification
+ verify(mockClient).getJobCounters(id);
+ }
+
+}