Updated Branches: refs/heads/sqoop2 086409b6a -> 4c2a343d9
SQOOP-678: Add counters handling to map reduce submission engine (Jarek Jarcec Cecho via Cheolsoo Park) Project: http://git-wip-us.apache.org/repos/asf/sqoop/repo Commit: http://git-wip-us.apache.org/repos/asf/sqoop/commit/4c2a343d Tree: http://git-wip-us.apache.org/repos/asf/sqoop/tree/4c2a343d Diff: http://git-wip-us.apache.org/repos/asf/sqoop/diff/4c2a343d Branch: refs/heads/sqoop2 Commit: 4c2a343d96da9547c7c4c7f83782103310b3c0fe Parents: 086409b Author: Cheolsoo Park <[email protected]> Authored: Fri Jan 4 02:50:02 2013 -0800 Committer: Cheolsoo Park <[email protected]> Committed: Fri Jan 4 02:50:02 2013 -0800 ---------------------------------------------------------------------- .../apache/sqoop/framework/FrameworkManager.java | 2 +- .../apache/sqoop/framework/SubmissionEngine.java | 4 +- .../mapreduce/MapreduceSubmissionEngine.java | 39 +++++++++++++- 3 files changed, 39 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/sqoop/blob/4c2a343d/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java index 40e1f0b..4d48b27 100644 --- a/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java +++ b/core/src/main/java/org/apache/sqoop/framework/FrameworkManager.java @@ -566,7 +566,7 @@ public final class FrameworkManager { if(newStatus.isRunning()) { progress = submissionEngine.progress(externalId); } else { - counters = submissionEngine.stats(externalId); + counters = submissionEngine.counters(externalId); } submission.setStatus(newStatus); http://git-wip-us.apache.org/repos/asf/sqoop/blob/4c2a343d/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java b/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java index 71e4ec9..3c0f6eb 100644 --- a/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java +++ b/core/src/main/java/org/apache/sqoop/framework/SubmissionEngine.java @@ -91,13 +91,13 @@ public abstract class SubmissionEngine { /** * Return statistics for given submission id. * - * Sqoop framework will call stats only for submission in state SUCCEEDED, + * Sqoop framework will call counters only for submission in state SUCCEEDED, * it's consider exceptional state to call this method for other states. * * @param submissionId Submission internal id. * @return Submission statistics */ - public Counters stats(String submissionId) { + public Counters counters(String submissionId) { return null; } http://git-wip-us.apache.org/repos/asf/sqoop/blob/4c2a343d/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java ---------------------------------------------------------------------- diff --git a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java index 5c57758..1a144d0 100644 --- a/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java +++ b/submission/mapreduce/src/main/java/org/apache/sqoop/submission/mapreduce/MapreduceSubmissionEngine.java @@ -35,6 +35,8 @@ import org.apache.sqoop.framework.SubmissionRequest; import org.apache.sqoop.framework.SubmissionEngine; import org.apache.sqoop.job.JobConstants; import org.apache.sqoop.model.FormUtils; +import org.apache.sqoop.submission.counter.Counter; +import org.apache.sqoop.submission.counter.CounterGroup; import org.apache.sqoop.submission.counter.Counters; import org.apache.sqoop.submission.SubmissionStatus; @@ -307,9 +309,18 @@ public class MapreduceSubmissionEngine extends SubmissionEngine { * {@inheritDoc} */ @Override - public Counters stats(String submissionId) { - //TODO(jarcec): Not supported yet - return super.stats(submissionId); + public Counters counters(String submissionId) { + try { + RunningJob runningJob = jobClient.getJob(JobID.forName(submissionId)); + if(runningJob == null) { + // Return default value + return super.counters(submissionId); + } + + return convertMapreduceCounters(runningJob.getCounters()); + } catch (IOException e) { + throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0003, e); + } } /** @@ -352,4 +363,26 @@ public class MapreduceSubmissionEngine extends SubmissionEngine { throw new SqoopException(MapreduceSubmissionError.MAPREDUCE_0004, "Unknown status " + status); } + + /** + * Convert Hadoop counters to Sqoop counters. + * + * @param hadoopCounters Hadoop counters + * @return Appropriate Sqoop counters + */ + private Counters convertMapreduceCounters(org.apache.hadoop.mapred.Counters hadoopCounters) { + Counters sqoopCounters = new Counters(); + + for(org.apache.hadoop.mapred.Counters.Group hadoopGroup : hadoopCounters) { + CounterGroup sqoopGroup = new CounterGroup(hadoopGroup.getName()); + for(org.apache.hadoop.mapred.Counters.Counter hadoopCounter : hadoopGroup) { + Counter sqoopCounter = new Counter(hadoopCounter.getName(), hadoopCounter.getValue()); + sqoopGroup.addCounter(sqoopCounter); + } + sqoopCounters.addCounterGroup(sqoopGroup); + } + + return sqoopCounters; + } + }
