Repository: sqoop Updated Branches: refs/heads/sqoop2 89d9660c7 -> cf9af0081
SQOOP-1879: Sqoop2: Submission Engine does not set all details on SubmissionRecord in Local mode (Veena Basavaraj via Abraham Elmahrek) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/cf9af008 Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/cf9af008 Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/cf9af008 Branch: refs/heads/sqoop2 Commit: cf9af0081296c779a6d5245b00a2afb7815709b8 Parents: 89d9660 Author: Abraham Elmahrek <[email protected]> Authored: Tue Dec 16 18:07:53 2014 -0800 Committer: Abraham Elmahrek <[email protected]> Committed: Tue Dec 16 18:08:22 2014 -0800 ---------------------------------------------------------------------- .../org/apache/sqoop/driver/JobManager.java | 2 +- .../org/apache/sqoop/driver/JobRequest.java | 4 +- .../mapreduce/MapreduceSubmissionEngine.java | 95 ++++++++++++++------ .../sqoop/test/testcases/ConnectorTestCase.java | 2 +- 4 files changed, 71 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/cf9af008/core/src/main/java/org/apache/sqoop/driver/JobManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/driver/JobManager.java b/core/src/main/java/org/apache/sqoop/driver/JobManager.java index f286c02..dc441bc 100644 --- a/core/src/main/java/org/apache/sqoop/driver/JobManager.java +++ b/core/src/main/java/org/apache/sqoop/driver/JobManager.java @@ -335,7 +335,7 @@ public class JobManager implements Reconfigurable { // Create a job request for submit/execution JobRequest jobRequest = executionEngine.createJobRequest(); // Save important variables to the job request - jobRequest.setSummary(submission); + jobRequest.setJobSubmission(submission); jobRequest.setConnector(Direction.FROM, fromConnector); jobRequest.setConnector(Direction.TO, toConnector); http://git-wip-us.apache.org/repos/asf/sqoop/blob/cf9af008/core/src/main/java/org/apache/sqoop/driver/JobRequest.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/driver/JobRequest.java b/core/src/main/java/org/apache/sqoop/driver/JobRequest.java index d2496bd..cfa45b2 100644 --- a/core/src/main/java/org/apache/sqoop/driver/JobRequest.java +++ b/core/src/main/java/org/apache/sqoop/driver/JobRequest.java @@ -28,8 +28,6 @@ import org.apache.sqoop.model.MSubmission; import org.apache.sqoop.utils.ClassUtils; import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; import java.util.Set; /** @@ -140,7 +138,7 @@ public class JobRequest { return jobSubmission; } - public void setSummary(MSubmission submission) { + public void setJobSubmission(MSubmission submission) { this.jobSubmission = submission; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/cf9af008/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java ---------------------------------------------------------------------- diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java index 22a9736..d15bcfc 100644 --- a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java +++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java @@ -246,17 +246,14 @@ public class MapreduceSubmissionEngine extends SubmissionEngine { // If we're in local mode than wait on completion. Local job runner do not // seems to be exposing API to get previously submitted job which makes // other methods of the submission engine quite useless. - if(isLocal()) { - job.waitForCompletion(true); + // NOTE: The minicluster mode is not local. It runs similar to a real MR cluster but + // only that it is in the same JVM + if (isLocal()) { + submitToLocalRunner(request, job); } else { - job.submit(); + submitToCluster(request, job); } - - String jobId = job.getJobID().toString(); - request.getJobSubmission().setExternalJobId(jobId); - request.getJobSubmission().setExternalLink(job.getTrackingURL()); - - LOG.debug("Executed new map-reduce job with id " + jobId); + LOG.debug("Executed new map-reduce job with id " + job.getJobID().toString()); } catch (Exception e) { SubmissionError error = new SubmissionError(); error.setErrorSummary(e.toString()); @@ -272,6 +269,32 @@ public class MapreduceSubmissionEngine extends SubmissionEngine { return true; } + private void submitToCluster(MRJobRequest request, Job job) throws IOException, InterruptedException, ClassNotFoundException { + job.submit(); + request.getJobSubmission().setExternalJobId(job.getJobID().toString()); + request.getJobSubmission().setExternalLink(job.getTrackingURL()); + } + + private void submitToLocalRunner(MRJobRequest request, Job job) throws IOException, InterruptedException, + ClassNotFoundException { + boolean successful = job.waitForCompletion(true); + if (successful) { + request.getJobSubmission().setStatus(SubmissionStatus.SUCCEEDED); + } else { + // treat any other state as failed + request.getJobSubmission().setStatus(SubmissionStatus.FAILED); + } + request.getJobSubmission().setExternalJobId(job.getJobID().toString()); + request.getJobSubmission().setExternalLink(job.getTrackingURL()); + + request.getJobSubmission().setStatus(convertMapreduceState(job.getJobState().getValue())); + // there is no failure info in this job api, unlike the running job + request.getJobSubmission().setError(null); + request.getJobSubmission().setProgress((job.mapProgress() + job.reduceProgress()) / 2); + request.getJobSubmission().setCounters(convertHadoop2MapreduceCounters(job.getCounters())); + request.getJobSubmission().setLastUpdateDate(new Date()); + } + /** * {@inheritDoc} */ @@ -342,20 +365,12 @@ public class MapreduceSubmissionEngine extends SubmissionEngine { return null; } - return convertMapreduceCounters(runningJob.getCounters()); + return convertHadoop1MapreduceCounters(runningJob.getCounters()); } catch (IOException e) { throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0003, e); } } - private String externalLink(RunningJob runningJob) { - if (runningJob == null) { - return null; - } - - return runningJob.getTrackingURL(); - } - /** * Convert map-reduce specific job status constants to Sqoop job status * constants. @@ -382,21 +397,22 @@ public class MapreduceSubmissionEngine extends SubmissionEngine { /** - * Convert Hadoop counters to Sqoop counters. + * Convert Hadoop1 counters to Sqoop counters. * * @param hadoopCounters Hadoop counters * @return Appropriate Sqoop counters */ - private Counters convertMapreduceCounters(org.apache.hadoop.mapred.Counters hadoopCounters) { + + private Counters convertHadoop1MapreduceCounters(org.apache.hadoop.mapred.Counters hadoopCounters) { Counters sqoopCounters = new Counters(); - if(hadoopCounters == null) { + if (hadoopCounters == null) { return sqoopCounters; } - for(org.apache.hadoop.mapred.Counters.Group hadoopGroup : hadoopCounters) { - CounterGroup sqoopGroup = new CounterGroup(hadoopGroup.getName()); - for(org.apache.hadoop.mapred.Counters.Counter hadoopCounter : hadoopGroup) { + for (org.apache.hadoop.mapred.Counters.Group counterGroup : hadoopCounters) { + CounterGroup sqoopGroup = new CounterGroup(counterGroup.getName()); + for (org.apache.hadoop.mapred.Counters.Counter hadoopCounter : counterGroup) { Counter sqoopCounter = new Counter(hadoopCounter.getName(), hadoopCounter.getValue()); sqoopGroup.addCounter(sqoopCounter); } @@ -407,6 +423,32 @@ public class MapreduceSubmissionEngine extends SubmissionEngine { } /** + * Convert Hadoop2 counters to Sqoop counters. + * + * @param hadoopCounters Hadoop counters + * @return Appropriate Sqoop counters + */ + private Counters convertHadoop2MapreduceCounters(org.apache.hadoop.mapreduce.Counters hadoopCounters) { + Counters sqoopCounters = new Counters(); + + if (hadoopCounters == null) { + return sqoopCounters; + } + + for (org.apache.hadoop.mapreduce.CounterGroup counterGroup : hadoopCounters) { + CounterGroup sqoopGroup = new CounterGroup(counterGroup.getName()); + for (org.apache.hadoop.mapreduce.Counter hadoopCounter : counterGroup) { + Counter sqoopCounter = new Counter(hadoopCounter.getName(), hadoopCounter.getValue()); + sqoopGroup.addCounter(sqoopCounter); + } + sqoopCounters.addCounterGroup(sqoopGroup); + } + + return sqoopCounters; + } + + + /** * {@inheritDoc} */ @Override @@ -419,19 +461,18 @@ public class MapreduceSubmissionEngine extends SubmissionEngine { SubmissionStatus newStatus = status(runningJob); SubmissionError error = error(runningJob); - String externalLink = externalLink(runningJob); if (newStatus.isRunning()) { progress = progress(runningJob); } else { counters = counters(runningJob); } - + // these properties change as the job runs, rest of the submission attributes + // do not change as job runs submission.setStatus(newStatus); submission.setError(error); submission.setProgress(progress); submission.setCounters(counters); - submission.setExternalLink(externalLink); submission.setLastUpdateDate(new Date()); RepositoryManager.getInstance().getRepository().updateSubmission(submission); http://git-wip-us.apache.org/repos/asf/sqoop/blob/cf9af008/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java b/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java index a423785..9a76c4b 100644 --- a/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java +++ b/test/src/main/java/org/apache/sqoop/test/testcases/ConnectorTestCase.java @@ -249,7 +249,7 @@ abstract public class ConnectorTestCase extends TomcatTestCase { LOG.error("Submission has failed: " + finalSubmission.getError().getErrorSummary()); LOG.error("Corresponding error details: " + finalSubmission.getError().getErrorDetails()); } - assertEquals("Submission has failed with " + finalSubmission.getError().getErrorSummary(), SubmissionStatus.SUCCEEDED, finalSubmission.getStatus()); + assertEquals("Submission finished with error: " + finalSubmission.getError().getErrorSummary(), SubmissionStatus.SUCCEEDED, finalSubmission.getStatus()); } /**
