Updated Branches: refs/heads/master 6b3f2894e -> d864f2fd4
CRUNCH-202: Add MRPipelineExecution to expose some MR-specific APIs Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/d864f2fd Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/d864f2fd Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/d864f2fd Branch: refs/heads/master Commit: d864f2fd4634f62b2e5684169becd9f599c6d5e3 Parents: 6b3f289 Author: Chao Shi <[email protected]> Authored: Wed May 8 13:27:07 2013 +0800 Committer: Chao Shi <[email protected]> Committed: Wed May 8 13:27:07 2013 +0800 ---------------------------------------------------------------------- .../lib/jobcontrol/CrunchControlledJob.java | 36 +++++------- .../mapreduce/lib/jobcontrol/CrunchJobControl.java | 16 +++++- .../main/java/org/apache/crunch/impl/mr/MRJob.java | 45 +++++++++++++++ .../java/org/apache/crunch/impl/mr/MRPipeline.java | 4 +- .../apache/crunch/impl/mr/MRPipelineExecution.java | 28 +++++++++ .../org/apache/crunch/impl/mr/exec/MRExecutor.java | 18 +++++- .../apache/crunch/impl/mr/plan/DotfileWriter.java | 7 +- 7 files changed, 125 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/d864f2fd/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java index 93926c1..0038ab7 100644 --- a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java +++ b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java @@ -22,11 +22,13 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.crunch.impl.mr.MRJob; import org.apache.crunch.impl.mr.run.RuntimeParameters; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.util.StringUtils; +import com.google.common.base.Function; import com.google.common.base.Objects; import com.google.common.collect.Lists; @@ -34,18 +36,13 @@ import com.google.common.collect.Lists; * This class encapsulates a MapReduce job and its dependency. It monitors the * states of the depending jobs and updates the state of this job. A job starts * in the WAITING state. If it does not have any depending jobs, or all of the - * depending jobs are in SUCCEEDED state, then the job state will become READY. If + * depending jobs are in SUCCESS state, then the job state will become READY. If * any depending jobs fail, the job will fail too. When in READY state, the job * can be submitted to Hadoop for execution, with the state changing into * RUNNING state. From RUNNING state, the job can get into SUCCEEDED or FAILED * state, depending the status of the job execution. */ -public class CrunchControlledJob { - - // A job will be in one of the following states - public static enum State { - SUCCESS, WAITING, RUNNING, READY, FAILED, DEPENDENT_FAILED - }; +public class CrunchControlledJob implements MRJob { public static interface Hook { public void run() throws IOException; @@ -139,16 +136,22 @@ public class CrunchControlledJob { return this.job.getJobID(); } - /** - * @return the mapreduce job - */ + @Override public synchronized Job getJob() { return this.job; } - /** - * @return the state of this job - */ + @Override + public List<MRJob> getDependentJobs() { + return Lists.transform(dependingJobs, new Function<CrunchControlledJob, MRJob>() { + @Override + public MRJob apply(CrunchControlledJob job) { + return job; + } + }); + } + + @Override public synchronized State getJobState() { return this.state; } @@ -181,13 +184,6 @@ public class CrunchControlledJob { } /** - * @return the depending jobs of this job - */ - public List<CrunchControlledJob> getDependentJobs() { - return this.dependingJobs; - } - - /** * Add a job to this jobs' dependency list. Dependent jobs can only be added * while a Job is waiting to run, not during or afterwards. * http://git-wip-us.apache.org/repos/asf/crunch/blob/d864f2fd/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java index 727ab6f..47cfb94 100644 --- a/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java +++ b/crunch-core/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java @@ -23,9 +23,10 @@ import java.util.Hashtable; import java.util.List; import java.util.Map; +import com.google.common.collect.ImmutableList; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob.State; +import org.apache.crunch.impl.mr.MRJob.State; /** * This class encapsulates a set of MapReduce jobs and its dependency. @@ -106,6 +107,19 @@ public class CrunchJobControl { return toList(this.failedJobs); } + /** + * @return the jobs in all states + */ + public synchronized List<CrunchControlledJob> getAllJobs() { + return ImmutableList.<CrunchControlledJob>builder() + .addAll(waitingJobs.values()) + .addAll(readyJobs.values()) + .addAll(runningJobs.values()) + .addAll(successfulJobs.values()) + .addAll(failedJobs.values()) + .build(); + } + private static void addToQueue(CrunchControlledJob aJob, Map<Integer, CrunchControlledJob> queue) { synchronized (queue) { http://git-wip-us.apache.org/repos/asf/crunch/blob/d864f2fd/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRJob.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRJob.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRJob.java new file mode 100644 index 0000000..188b556 --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRJob.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr; + +import org.apache.hadoop.mapreduce.Job; + +import java.util.List; + +/** + * A Hadoop MapReduce job managed by Crunch. + */ +public interface MRJob { + + /** A job will be in one of the following states. */ + public static enum State { + SUCCESS, WAITING, RUNNING, READY, FAILED, DEPENDENT_FAILED + }; + + /** @return the Job ID assigned by Crunch */ + int getJobID(); + + /** @return the internal Hadoop MapReduce job */ + Job getJob(); + + /** @return the depending jobs of this job */ + List<MRJob> getDependentJobs(); + + /** @return the state of this job */ + State getJobState(); +} http://git-wip-us.apache.org/repos/asf/crunch/blob/d864f2fd/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java index 00cf486..f167846 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java @@ -168,8 +168,8 @@ public class MRPipeline implements Pipeline { } @Override - public PipelineExecution runAsync() { - PipelineExecution res = plan().execute(); + public MRPipelineExecution runAsync() { + MRPipelineExecution res = plan().execute(); outputTargets.clear(); return res; } http://git-wip-us.apache.org/repos/asf/crunch/blob/d864f2fd/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipelineExecution.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipelineExecution.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipelineExecution.java new file mode 100644 index 0000000..b9d53fe --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipelineExecution.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr; + +import org.apache.crunch.PipelineExecution; + +import java.util.List; + +public interface MRPipelineExecution extends PipelineExecution { + + List<MRJob> getJobs(); + +} http://git-wip-us.apache.org/repos/asf/crunch/blob/d864f2fd/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java index 4c7b7ea..9318271 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java @@ -27,16 +27,18 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.crunch.PipelineExecution; import org.apache.crunch.PipelineResult; import org.apache.crunch.SourceTarget; import org.apache.crunch.Target; import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob; import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl; +import org.apache.crunch.impl.mr.MRJob; +import org.apache.crunch.impl.mr.MRPipelineExecution; import org.apache.crunch.impl.mr.collect.PCollectionImpl; import org.apache.crunch.materialize.MaterializableIterable; import org.apache.hadoop.conf.Configuration; +import com.google.common.base.Function; import com.google.common.collect.Lists; /** @@ -48,7 +50,7 @@ import com.google.common.collect.Lists; * * It is thread-safe. */ -public class MRExecutor implements PipelineExecution { +public class MRExecutor implements MRPipelineExecution { private static final Log LOG = LogFactory.getLog(MRExecutor.class); @@ -88,7 +90,7 @@ public class MRExecutor implements PipelineExecution { this.planDotFile = planDotFile; } - public PipelineExecution execute() { + public MRPipelineExecution execute() { monitorThread.start(); return this; } @@ -195,4 +197,14 @@ public class MRExecutor implements PipelineExecution { conf.get("mapred.job.tracker", "local")); return "local".equals(jobTrackerAddress); } + + @Override + public List<MRJob> getJobs() { + return Lists.transform(control.getAllJobs(), new Function<CrunchControlledJob, MRJob>() { + @Override + public MRJob apply(CrunchControlledJob job) { + return job; + } + }); + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/d864f2fd/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java index 46d8c53..9541b99 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java @@ -201,19 +201,20 @@ public class DotfileWriter { public String buildDotfile() { StringBuilder stringBuilder = new StringBuilder(); stringBuilder.append("digraph G {\n"); - int clusterIndex = 0; for (String globalDeclaration : globalNodeDeclarations) { stringBuilder.append(String.format(" %s\n", globalDeclaration)); } for (JobPrototype jobPrototype : jobPrototypes){ + // Must prefix subgraph name with "cluster", otherwise its border won't render. I don't know why. StringBuilder jobProtoStringBuilder = new StringBuilder(); - jobProtoStringBuilder.append(String.format(" subgraph cluster%d {\n", clusterIndex++)); + jobProtoStringBuilder.append(String.format(" subgraph \"cluster-job%d\" {\n", jobPrototype.getJobID())); for (MRTaskType taskType : MRTaskType.values()){ Pair<JobPrototype,MRTaskType> jobTaskKey = Pair.of(jobPrototype, taskType); if (jobNodeDeclarations.containsKey(jobTaskKey)){ - jobProtoStringBuilder.append(String.format(" subgraph cluster%d {\n", clusterIndex++)); + jobProtoStringBuilder.append(String.format( + " subgraph \"cluster-job%d-%s\" {\n", jobPrototype.getJobID(), taskType.name().toLowerCase())); jobProtoStringBuilder.append(String.format(" %s\n", getTaskGraphAttributes(taskType))); for (String declarationEntry : jobNodeDeclarations.get(jobTaskKey)){ jobProtoStringBuilder.append(String.format(" %s\n", declarationEntry));
