Updated Branches: refs/heads/master bd72e849f -> b34c2f22f
CRUNCH-182: Remove CrunchJob and merge its logic into CrunchControlledJob Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/b34c2f22 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/b34c2f22 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/b34c2f22 Branch: refs/heads/master Commit: b34c2f22fb9543de1c4ac8158282be1776b77ce9 Parents: bd72e84 Author: Chao Shi <[email protected]> Authored: Sat Mar 23 15:18:58 2013 +0800 Committer: Chao Shi <[email protected]> Committed: Sat Mar 23 15:20:48 2013 +0800 ---------------------------------------------------------------------- .../lib/jobcontrol/CrunchControlledJob.java | 133 +++++------ .../mapreduce/lib/jobcontrol/CrunchJobControl.java | 68 ++---- .../org/apache/crunch/impl/mr/exec/CrunchJob.java | 175 --------------- .../apache/crunch/impl/mr/exec/CrunchJobHooks.java | 153 +++++++++++++ .../org/apache/crunch/impl/mr/exec/MRExecutor.java | 2 +- .../apache/crunch/impl/mr/plan/JobPrototype.java | 35 ++- .../apache/crunch/impl/mr/plan/MSCRPlanner.java | 9 +- .../crunch/impl/mr/run/RuntimeParameters.java | 2 + .../crunch/impl/mr/exec/CrunchJobHooksTest.java | 42 ++++ .../apache/crunch/impl/mr/exec/CrunchJobTest.java | 42 ---- 10 files changed, 310 insertions(+), 351 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/b34c2f22/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java index 223673e..93926c1 100644 --- a/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java +++ b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchControlledJob.java @@ -18,17 +18,18 @@ package org.apache.crunch.hadoop.mapreduce.lib.jobcontrol; import java.io.IOException; -import java.util.ArrayList; import java.util.List; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.crunch.impl.mr.run.RuntimeParameters; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobID; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.util.StringUtils; +import com.google.common.base.Objects; +import com.google.common.collect.Lists; + /** * This class encapsulates a MapReduce job and its dependency. It monitors the * states of the depending jobs and updates the state of this job. A job starts @@ -46,48 +47,50 @@ public class CrunchControlledJob { SUCCESS, WAITING, RUNNING, READY, FAILED, DEPENDENT_FAILED }; - public static final String CREATE_DIR = "mapreduce.jobcontrol.createdir.ifnotexist"; - protected State state; - protected Job job; // mapreduce job to be executed. - // some info for human consumption, e.g. the reason why the job failed - protected String message; - private String controlID; // assigned and used by JobControl class + public static interface Hook { + public void run() throws IOException; + } + + private static final Log LOG = LogFactory.getLog(CrunchControlledJob.class); + + private final int jobID; + private final Job job; // mapreduce job to be executed. // the jobs the current job depends on - private List<CrunchControlledJob> dependingJobs; + private final List<CrunchControlledJob> dependingJobs; + private final Hook prepareHook; + private final Hook completionHook; + private State state; + // some info for human consumption, e.g. the reason why the job failed + private String message; + private String lastKnownProgress; /** * Construct a job. - * + * + * @param jobID + * an ID used to match with its {@link org.apache.crunch.impl.mr.plan.JobPrototype}. * @param job * a mapreduce job to be executed. - * @param dependingJobs - * an array of jobs the current job depends on + * @param prepareHook + * a piece of code that will run before this job is submitted. + * @param completionHook + * a piece of code that will run after this job gets completed. */ - public CrunchControlledJob(Job job, List<CrunchControlledJob> dependingJobs) - throws IOException { + public CrunchControlledJob(int jobID, Job job, Hook prepareHook, Hook completionHook) { + this.jobID = jobID; this.job = job; - this.dependingJobs = dependingJobs; + this.dependingJobs = Lists.newArrayList(); + this.prepareHook = prepareHook; + this.completionHook = completionHook; this.state = State.WAITING; - this.controlID = "unassigned"; this.message = "just initialized"; } - /** - * Construct a job. - * - * @param conf - * mapred job configuration representing a job to be executed. - * @throws IOException - */ - public CrunchControlledJob(Configuration conf) throws IOException { - this(new Job(conf), null); - } - @Override public String toString() { StringBuffer sb = new StringBuffer(); sb.append("job name:\t").append(this.job.getJobName()).append("\n"); - sb.append("job id:\t").append(this.controlID).append("\n"); + sb.append("job id:\t").append(this.jobID).append("\n"); sb.append("job state:\t").append(this.state).append("\n"); sb.append("job mapred id:\t").append(this.job.getJobID()).append("\n"); sb.append("job message:\t").append(this.message).append("\n"); @@ -114,7 +117,7 @@ public class CrunchControlledJob { /** * Set the job name for this job. - * + * * @param jobName * the job name */ @@ -123,20 +126,10 @@ public class CrunchControlledJob { } /** - * @return the job ID of this job assigned by JobControl - */ - public String getJobID() { - return this.controlID; - } - - /** - * Set the job ID for this job. - * - * @param id - * the job ID + * @return the job ID of this job */ - public void setJobID(String id) { - this.controlID = id; + public int getJobID() { + return this.jobID; } /** @@ -154,16 +147,6 @@ public class CrunchControlledJob { } /** - * Set the mapreduce job - * - * @param job - * the mapreduce job for this job. - */ - public synchronized void setJob(Job job) { - this.job = job; - } - - /** * @return the state of this job */ public synchronized State getJobState() { @@ -214,9 +197,6 @@ public class CrunchControlledJob { */ public synchronized boolean addDependingJob(CrunchControlledJob dependingJob) { if (this.state == State.WAITING) { // only allowed to add jobs when waiting - if (this.dependingJobs == null) { - this.dependingJobs = new ArrayList<CrunchControlledJob>(); - } return this.dependingJobs.add(dependingJob); } else { return false; @@ -246,7 +226,7 @@ public class CrunchControlledJob { * Check the state of this running job. The state may remain the same, become * SUCCEEDED or FAILED. */ - protected void checkRunningState() throws IOException, InterruptedException { + private void checkRunningState() throws IOException, InterruptedException { try { if (job.isComplete()) { if (job.isSuccessful()) { @@ -255,6 +235,11 @@ public class CrunchControlledJob { this.state = State.FAILED; this.message = "Job failed!"; } + } else { + // still running + if (job.getConfiguration().getBoolean(RuntimeParameters.LOG_JOB_PROGRESS, false)) { + logJobProgress(); + } } } catch (IOException ioe) { this.state = State.FAILED; @@ -266,6 +251,9 @@ public class CrunchControlledJob { } catch (IOException e) { } } + if (isCompleted()) { + completionHook.run(); + } } /** @@ -313,26 +301,25 @@ public class CrunchControlledJob { */ protected synchronized void submit() { try { - Configuration conf = job.getConfiguration(); - if (conf.getBoolean(CREATE_DIR, false)) { - Path[] inputPaths = FileInputFormat.getInputPaths(job); - for (Path inputPath : inputPaths) { - FileSystem fs = inputPath.getFileSystem(conf); - if (!fs.exists(inputPath)) { - try { - fs.mkdirs(inputPath); - } catch (IOException e) { - - } - } - } - } + prepareHook.run(); job.submit(); this.state = State.RUNNING; + LOG.info("Running job \"" + getJobName() + "\""); + LOG.info("Job status available at: " + job.getTrackingURL()); } catch (Exception ioe) { this.state = State.FAILED; this.message = StringUtils.stringifyException(ioe); + LOG.info("Error occurred starting job \"" + getJobName() + "\":"); + LOG.info(getMessage()); } } + private void logJobProgress() throws IOException, InterruptedException { + String progress = String.format("map %.0f%% reduce %.0f%%", + 100.0 * job.mapProgress(), 100.0 * job.reduceProgress()); + if (!Objects.equal(lastKnownProgress, progress)) { + LOG.info(job.getJobName() + " progress: " + progress); + lastKnownProgress = progress; + } + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/b34c2f22/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java index 0342ad4..727ab6f 100644 --- a/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java +++ b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/jobcontrol/CrunchJobControl.java @@ -19,7 +19,6 @@ package org.apache.crunch.hadoop.mapreduce.lib.jobcontrol; import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.Hashtable; import java.util.List; import java.util.Map; @@ -40,16 +39,15 @@ import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob.Sta */ public class CrunchJobControl { - private Map<String, CrunchControlledJob> waitingJobs; - private Map<String, CrunchControlledJob> readyJobs; - private Map<String, CrunchControlledJob> runningJobs; - private Map<String, CrunchControlledJob> successfulJobs; - private Map<String, CrunchControlledJob> failedJobs; + private Map<Integer, CrunchControlledJob> waitingJobs; + private Map<Integer, CrunchControlledJob> readyJobs; + private Map<Integer, CrunchControlledJob> runningJobs; + private Map<Integer, CrunchControlledJob> successfulJobs; + private Map<Integer, CrunchControlledJob> failedJobs; private Log log = LogFactory.getLog(CrunchJobControl.class); - private long nextJobID; - private String groupName; + private final String groupName; /** * Construct a job control for a group of jobs. @@ -58,16 +56,15 @@ public class CrunchJobControl { * a name identifying this group */ public CrunchJobControl(String groupName) { - this.waitingJobs = new Hashtable<String, CrunchControlledJob>(); - this.readyJobs = new Hashtable<String, CrunchControlledJob>(); - this.runningJobs = new Hashtable<String, CrunchControlledJob>(); - this.successfulJobs = new Hashtable<String, CrunchControlledJob>(); - this.failedJobs = new Hashtable<String, CrunchControlledJob>(); - this.nextJobID = -1; + this.waitingJobs = new Hashtable<Integer, CrunchControlledJob>(); + this.readyJobs = new Hashtable<Integer, CrunchControlledJob>(); + this.runningJobs = new Hashtable<Integer, CrunchControlledJob>(); + this.successfulJobs = new Hashtable<Integer, CrunchControlledJob>(); + this.failedJobs = new Hashtable<Integer, CrunchControlledJob>(); this.groupName = groupName; } - private static List<CrunchControlledJob> toList(Map<String, CrunchControlledJob> jobs) { + private static List<CrunchControlledJob> toList(Map<Integer, CrunchControlledJob> jobs) { ArrayList<CrunchControlledJob> retv = new ArrayList<CrunchControlledJob>(); synchronized (jobs) { for (CrunchControlledJob job : jobs.values()) { @@ -109,25 +106,20 @@ public class CrunchJobControl { return toList(this.failedJobs); } - private String getNextJobID() { - nextJobID += 1; - return this.groupName + this.nextJobID; - } - private static void addToQueue(CrunchControlledJob aJob, - Map<String, CrunchControlledJob> queue) { + Map<Integer, CrunchControlledJob> queue) { synchronized (queue) { queue.put(aJob.getJobID(), aJob); } } private void addToQueue(CrunchControlledJob aJob) { - Map<String, CrunchControlledJob> queue = getQueue(aJob.getJobState()); + Map<Integer, CrunchControlledJob> queue = getQueue(aJob.getJobState()); addToQueue(aJob, queue); } - private Map<String, CrunchControlledJob> getQueue(State state) { - Map<String, CrunchControlledJob> retv = null; + private Map<Integer, CrunchControlledJob> getQueue(State state) { + Map<Integer, CrunchControlledJob> retv = null; if (state == State.WAITING) { retv = this.waitingJobs; } else if (state == State.READY) { @@ -148,31 +140,17 @@ public class CrunchJobControl { * @param aJob * the new job */ - synchronized public String addJob(CrunchControlledJob aJob) { - String id = this.getNextJobID(); - aJob.setJobID(id); + synchronized public void addJob(CrunchControlledJob aJob) { aJob.setJobState(State.WAITING); this.addToQueue(aJob); - return id; - } - - /** - * Add a collection of jobs - * - * @param jobs - */ - public void addJobCollection(Collection<CrunchControlledJob> jobs) { - for (CrunchControlledJob job : jobs) { - addJob(job); - } } synchronized private void checkRunningJobs() throws IOException, InterruptedException { - Map<String, CrunchControlledJob> oldJobs = null; + Map<Integer, CrunchControlledJob> oldJobs = null; oldJobs = this.runningJobs; - this.runningJobs = new Hashtable<String, CrunchControlledJob>(); + this.runningJobs = new Hashtable<Integer, CrunchControlledJob>(); for (CrunchControlledJob nextJob : oldJobs.values()) { nextJob.checkState(); @@ -182,9 +160,9 @@ public class CrunchJobControl { synchronized private void checkWaitingJobs() throws IOException, InterruptedException { - Map<String, CrunchControlledJob> oldJobs = null; + Map<Integer, CrunchControlledJob> oldJobs = null; oldJobs = this.waitingJobs; - this.waitingJobs = new Hashtable<String, CrunchControlledJob>(); + this.waitingJobs = new Hashtable<Integer, CrunchControlledJob>(); for (CrunchControlledJob nextJob : oldJobs.values()) { nextJob.checkState(); @@ -193,9 +171,9 @@ public class CrunchJobControl { } synchronized private void startReadyJobs() { - Map<String, CrunchControlledJob> oldJobs = null; + Map<Integer, CrunchControlledJob> oldJobs = null; oldJobs = this.readyJobs; - this.readyJobs = new Hashtable<String, CrunchControlledJob>(); + this.readyJobs = new Hashtable<Integer, CrunchControlledJob>(); for (CrunchControlledJob nextJob : oldJobs.values()) { // Submitting Job to Hadoop http://git-wip-us.apache.org/repos/asf/crunch/blob/b34c2f22/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java deleted file mode 100644 index f0e5cd1..0000000 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJob.java +++ /dev/null @@ -1,175 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.impl.mr.exec; - -import java.io.IOException; -import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob; -import org.apache.crunch.impl.mr.plan.MSCROutputHandler; -import org.apache.crunch.impl.mr.plan.PlanningParameters; -import org.apache.crunch.impl.mr.run.RuntimeParameters; -import org.apache.crunch.io.FileNamingScheme; -import org.apache.crunch.io.PathTarget; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.util.StringUtils; - -import com.google.common.base.Objects; -import com.google.common.collect.Lists; - -public class CrunchJob extends CrunchControlledJob { - - private final Log log = LogFactory.getLog(CrunchJob.class); - - private final Path workingPath; - private final Map<Integer, PathTarget> multiPaths; - private final boolean mapOnlyJob; - private String lastKnownProgress; - - public CrunchJob(Job job, Path workingPath, MSCROutputHandler handler) throws IOException { - super(job, Lists.<CrunchControlledJob> newArrayList()); - this.workingPath = workingPath; - this.multiPaths = handler.getMultiPaths(); - this.mapOnlyJob = handler.isMapOnlyJob(); - } - - private synchronized void handleMultiPaths() throws IOException { - if (!multiPaths.isEmpty()) { - // Need to handle moving the data from the output directory of the - // job to the output locations specified in the paths. - FileSystem srcFs = workingPath.getFileSystem(job.getConfiguration()); - for (Map.Entry<Integer, PathTarget> entry : multiPaths.entrySet()) { - final int i = entry.getKey(); - final Path dst = entry.getValue().getPath(); - FileNamingScheme fileNamingScheme = entry.getValue().getFileNamingScheme(); - - Path src = new Path(workingPath, PlanningParameters.MULTI_OUTPUT_PREFIX + i + "-*"); - Path[] srcs = FileUtil.stat2Paths(srcFs.globStatus(src), src); - Configuration conf = job.getConfiguration(); - FileSystem dstFs = dst.getFileSystem(conf); - if (!dstFs.exists(dst)) { - dstFs.mkdirs(dst); - } - boolean sameFs = isCompatible(srcFs, dst); - for (Path s : srcs) { - Path d = getDestFile(conf, s, dst, fileNamingScheme); - if (sameFs) { - srcFs.rename(s, d); - } else { - FileUtil.copy(srcFs, s, dstFs, d, true, true, job.getConfiguration()); - } - } - } - } - } - - private boolean isCompatible(FileSystem fs, Path path) { - try { - fs.makeQualified(path); - return true; - } catch (IllegalArgumentException e) { - return false; - } - } - - private Path getDestFile(Configuration conf, Path src, Path dir, FileNamingScheme fileNamingScheme) - throws IOException { - String outputFilename = null; - if (mapOnlyJob) { - outputFilename = fileNamingScheme.getMapOutputName(conf, dir); - } else { - outputFilename = fileNamingScheme.getReduceOutputName(conf, dir, CrunchJob.extractPartitionNumber(src.getName())); - } - if (src.getName().endsWith(org.apache.avro.mapred.AvroOutputFormat.EXT)) { - outputFilename += org.apache.avro.mapred.AvroOutputFormat.EXT; - } - return new Path(dir, outputFilename); - } - - @Override - protected void checkRunningState() throws IOException, InterruptedException { - try { - if (job.isComplete()) { - if (job.isSuccessful()) { - handleMultiPaths(); - this.state = State.SUCCESS; - } else { - this.state = State.FAILED; - this.message = "Job failed!"; - } - } else { // still running - if (job.getConfiguration().getBoolean(RuntimeParameters.LOG_JOB_PROGRESS, false)) { - logJobProgress(); - } - } - } catch (IOException ioe) { - this.state = State.FAILED; - this.message = StringUtils.stringifyException(ioe); - try { - if (job != null) { - job.killJob(); - } - } catch (IOException e) { - } - } - } - - @Override - protected synchronized void submit() { - super.submit(); - if (this.state == State.RUNNING) { - log.info("Running job \"" + getJobName() + "\""); - log.info("Job status available at: " + job.getTrackingURL()); - } else { - log.info("Error occurred starting job \"" + getJobName() + "\":"); - log.info(getMessage()); - } - } - - private void logJobProgress() throws IOException, InterruptedException { - String progress = String.format("map %.0f%% reduce %.0f%%", - 100.0 * job.mapProgress(), 100.0 * job.reduceProgress()); - if (!Objects.equal(lastKnownProgress, progress)) { - log.info(job.getJobName() + " progress: " + progress); - lastKnownProgress = progress; - } - } - - /** - * Extract the partition number from a raw reducer output filename. - * - * @param fileName The raw reducer output file name - * @return The partition number encoded in the filename - */ - static int extractPartitionNumber(String reduceOutputFileName) { - Matcher matcher = Pattern.compile(".*-r-(\\d{5})").matcher(reduceOutputFileName); - if (matcher.find()) { - return Integer.parseInt(matcher.group(1), 10); - } else { - throw new IllegalArgumentException("Reducer output name '" + reduceOutputFileName + "' cannot be parsed"); - } - } -} http://git-wip-us.apache.org/repos/asf/crunch/blob/b34c2f22/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java new file mode 100644 index 0000000..74bc9ac --- /dev/null +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/CrunchJobHooks.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.exec; + +import java.io.IOException; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob; +import org.apache.crunch.impl.mr.plan.PlanningParameters; +import org.apache.crunch.impl.mr.run.RuntimeParameters; +import org.apache.crunch.io.FileNamingScheme; +import org.apache.crunch.io.PathTarget; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; + +public final class CrunchJobHooks { + + private CrunchJobHooks() {} + + /** Creates missing input directories before job is submitted. */ + public static final class PrepareHook implements CrunchControlledJob.Hook { + private final Job job; + + public PrepareHook(Job job) { + this.job = job; + } + + @Override + public void run() throws IOException { + Configuration conf = job.getConfiguration(); + if (conf.getBoolean(RuntimeParameters.CREATE_DIR, false)) { + Path[] inputPaths = FileInputFormat.getInputPaths(job); + for (Path inputPath : inputPaths) { + FileSystem fs = inputPath.getFileSystem(conf); + if (!fs.exists(inputPath)) { + try { + fs.mkdirs(inputPath); + } catch (IOException e) { + } + } + } + } + } + } + + /** Moving output files produced by the MapReduce job to specified directories. */ + public static final class CompletionHook implements CrunchControlledJob.Hook { + private final Job job; + private final Path workingPath; + private final Map<Integer, PathTarget> multiPaths; + private final boolean mapOnlyJob; + + public CompletionHook(Job job, Path workingPath, Map<Integer, PathTarget> multiPaths, boolean mapOnlyJob) { + this.job = job; + this.workingPath = workingPath; + this.multiPaths = multiPaths; + this.mapOnlyJob = mapOnlyJob; + } + + @Override + public void run() throws IOException { + handleMultiPaths(); + } + + private synchronized void handleMultiPaths() throws IOException { + if (!multiPaths.isEmpty()) { + // Need to handle moving the data from the output directory of the + // job to the output locations specified in the paths. + FileSystem srcFs = workingPath.getFileSystem(job.getConfiguration()); + for (Map.Entry<Integer, PathTarget> entry : multiPaths.entrySet()) { + final int i = entry.getKey(); + final Path dst = entry.getValue().getPath(); + FileNamingScheme fileNamingScheme = entry.getValue().getFileNamingScheme(); + + Path src = new Path(workingPath, PlanningParameters.MULTI_OUTPUT_PREFIX + i + "-*"); + Path[] srcs = FileUtil.stat2Paths(srcFs.globStatus(src), src); + Configuration conf = job.getConfiguration(); + FileSystem dstFs = dst.getFileSystem(conf); + if (!dstFs.exists(dst)) { + dstFs.mkdirs(dst); + } + boolean sameFs = isCompatible(srcFs, dst); + for (Path s : srcs) { + Path d = getDestFile(conf, s, dst, fileNamingScheme); + if (sameFs) { + srcFs.rename(s, d); + } else { + FileUtil.copy(srcFs, s, dstFs, d, true, true, job.getConfiguration()); + } + } + } + } + } + + private boolean isCompatible(FileSystem fs, Path path) { + try { + fs.makeQualified(path); + return true; + } catch (IllegalArgumentException e) { + return false; + } + } + private Path getDestFile(Configuration conf, Path src, Path dir, FileNamingScheme fileNamingScheme) + throws IOException { + String outputFilename = null; + if (mapOnlyJob) { + outputFilename = fileNamingScheme.getMapOutputName(conf, dir); + } else { + outputFilename = fileNamingScheme.getReduceOutputName(conf, dir, extractPartitionNumber(src.getName())); + } + if (src.getName().endsWith(org.apache.avro.mapred.AvroOutputFormat.EXT)) { + outputFilename += org.apache.avro.mapred.AvroOutputFormat.EXT; + } + return new Path(dir, outputFilename); + } + } + + /** + * Extract the partition number from a raw reducer output filename. + * + * @param reduceOutputFileName The raw reducer output file name + * @return The partition number encoded in the filename + */ + static int extractPartitionNumber(String reduceOutputFileName) { + Matcher matcher = Pattern.compile(".*-r-(\\d{5})").matcher(reduceOutputFileName); + if (matcher.find()) { + return Integer.parseInt(matcher.group(1), 10); + } else { + throw new IllegalArgumentException("Reducer output name '" + reduceOutputFileName + "' cannot be parsed"); + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/b34c2f22/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java index a784b66..4c7b7ea 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java @@ -80,7 +80,7 @@ public class MRExecutor implements PipelineExecution { : new CappedExponentialCounter(500, 10000); } - public void addJob(CrunchJob job) { + public void addJob(CrunchControlledJob job) { this.control.addJob(job); } http://git-wip-us.apache.org/repos/asf/crunch/blob/b34c2f22/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java index 181468f..f22b5a1 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java @@ -25,10 +25,11 @@ import java.util.Set; import org.apache.crunch.Pipeline; import org.apache.crunch.Target; +import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob; import org.apache.crunch.impl.mr.collect.DoTableImpl; import org.apache.crunch.impl.mr.collect.PCollectionImpl; import org.apache.crunch.impl.mr.collect.PGroupedTableImpl; -import org.apache.crunch.impl.mr.exec.CrunchJob; +import org.apache.crunch.impl.mr.exec.CrunchJobHooks; import org.apache.crunch.impl.mr.run.CrunchCombiner; import org.apache.crunch.impl.mr.run.CrunchInputFormat; import org.apache.crunch.impl.mr.run.CrunchMapper; @@ -49,14 +50,16 @@ import com.google.common.collect.Sets; class JobPrototype { - public static JobPrototype createMapReduceJob(PGroupedTableImpl<?, ?> group, Set<NodePath> inputs, Path workingPath) { - return new JobPrototype(inputs, group, workingPath); + public static JobPrototype createMapReduceJob(int jobID, PGroupedTableImpl<?, ?> group, + Set<NodePath> inputs, Path workingPath) { + return new JobPrototype(jobID, inputs, group, workingPath); } - public static JobPrototype createMapOnlyJob(HashMultimap<Target, NodePath> mapNodePaths, Path workingPath) { - return new JobPrototype(mapNodePaths, workingPath); + public static JobPrototype createMapOnlyJob(int jobID, HashMultimap<Target, NodePath> mapNodePaths, Path workingPath) { + return new JobPrototype(jobID, mapNodePaths, workingPath); } + private final int jobID; // TODO: maybe stageID sounds better private final Set<NodePath> mapNodePaths; private final PGroupedTableImpl<?, ?> group; private final Set<JobPrototype> dependencies = Sets.newHashSet(); @@ -66,22 +69,28 @@ class JobPrototype { private HashMultimap<Target, NodePath> targetsToNodePaths; private DoTableImpl<?, ?> combineFnTable; - private CrunchJob job; + private CrunchControlledJob job; - private JobPrototype(Set<NodePath> inputs, PGroupedTableImpl<?, ?> group, Path workingPath) { + private JobPrototype(int jobID, Set<NodePath> inputs, PGroupedTableImpl<?, ?> group, Path workingPath) { + this.jobID = jobID; this.mapNodePaths = ImmutableSet.copyOf(inputs); this.group = group; this.workingPath = workingPath; this.targetsToNodePaths = null; } - private JobPrototype(HashMultimap<Target, NodePath> outputPaths, Path workingPath) { + private JobPrototype(int jobID, HashMultimap<Target, NodePath> outputPaths, Path workingPath) { + this.jobID = jobID; this.group = null; this.mapNodePaths = null; this.workingPath = workingPath; this.targetsToNodePaths = outputPaths; } + public int getJobID() { + return jobID; + } + public boolean isMapOnly() { return this.group == null; } @@ -109,7 +118,7 @@ class JobPrototype { this.dependencies.add(dependency); } - public CrunchJob getCrunchJob(Class<?> jarClass, Configuration conf, Pipeline pipeline) throws IOException { + public CrunchControlledJob getCrunchJob(Class<?> jarClass, Configuration conf, Pipeline pipeline) throws IOException { if (job == null) { job = build(jarClass, conf, pipeline); for (JobPrototype proto : dependencies) { @@ -119,7 +128,7 @@ class JobPrototype { return job; } - private CrunchJob build(Class<?> jarClass, Configuration conf, Pipeline pipeline) throws IOException { + private CrunchControlledJob build(Class<?> jarClass, Configuration conf, Pipeline pipeline) throws IOException { Job job = new Job(conf); conf = job.getConfiguration(); conf.set(PlanningParameters.CRUNCH_WORKING_DIRECTORY, workingPath.toString()); @@ -190,7 +199,11 @@ class JobPrototype { } job.setJobName(createJobName(pipeline.getName(), inputNodes, reduceNode)); - return new CrunchJob(job, outputPath, outputHandler); + return new CrunchControlledJob( + jobID, + job, + new CrunchJobHooks.PrepareHook(job), + new CrunchJobHooks.CompletionHook(job, outputPath, outputHandler.getMultiPaths(), group == null)); } private void serialize(List<DoNode> nodes, Configuration conf, Path workingPath, NodeContext context) http://git-wip-us.apache.org/repos/asf/crunch/blob/b34c2f22/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java index 146bcbf..3e1de38 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java @@ -46,7 +46,8 @@ public class MSCRPlanner { private final MRPipeline pipeline; private final Map<PCollectionImpl<?>, Set<Target>> outputs; private final Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize; - + private int lastJobID = 0; + public MSCRPlanner(MRPipeline pipeline, Map<PCollectionImpl<?>, Set<Target>> outputs, Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize) { this.pipeline = pipeline; @@ -267,7 +268,7 @@ public class MSCRPlanner { throw new IllegalStateException("No outputs?"); } JobPrototype prototype = JobPrototype.createMapOnlyJob( - outputPaths, pipeline.createTempPath()); + ++lastJobID, outputPaths, pipeline.createTempPath()); for (Vertex v : component) { assignment.put(v, prototype); } @@ -280,7 +281,7 @@ public class MSCRPlanner { usedEdges.add(e); } JobPrototype prototype = JobPrototype.createMapReduceJob( - (PGroupedTableImpl) g.getPCollection(), inputs, pipeline.createTempPath()); + ++lastJobID, (PGroupedTableImpl) g.getPCollection(), inputs, pipeline.createTempPath()); assignment.put(g, prototype); for (Edge e : g.getIncomingEdges()) { assignment.put(e.getHead(), prototype); @@ -335,7 +336,7 @@ public class MSCRPlanner { } if (!outputPaths.isEmpty()) { JobPrototype prototype = JobPrototype.createMapOnlyJob( - outputPaths, pipeline.createTempPath()); + ++lastJobID, outputPaths, pipeline.createTempPath()); for (Vertex orphan : orphans) { assignment.put(orphan, prototype); } http://git-wip-us.apache.org/repos/asf/crunch/blob/b34c2f22/crunch/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java index 1ee19e7..604c49c 100644 --- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java +++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java @@ -30,6 +30,8 @@ public class RuntimeParameters { public static final String LOG_JOB_PROGRESS = "crunch.log.job.progress"; + public static final String CREATE_DIR = "mapreduce.jobcontrol.createdir.ifnotexist"; + // Not instantiated private RuntimeParameters() { } http://git-wip-us.apache.org/repos/asf/crunch/blob/b34c2f22/crunch/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobHooksTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobHooksTest.java b/crunch/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobHooksTest.java new file mode 100644 index 0000000..f03c3e2 --- /dev/null +++ b/crunch/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobHooksTest.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.exec; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +public class CrunchJobHooksTest { + + @Test + public void testExtractPartitionNumber() { + assertEquals(0, CrunchJobHooks.extractPartitionNumber("out1-r-00000")); + assertEquals(10, CrunchJobHooks.extractPartitionNumber("out2-r-00010")); + assertEquals(99999, CrunchJobHooks.extractPartitionNumber("out3-r-99999")); + } + + @Test + public void testExtractPartitionNumber_WithSuffix() { + assertEquals(10, CrunchJobHooks.extractPartitionNumber("out2-r-00010.avro")); + } + + @Test(expected = IllegalArgumentException.class) + public void testExtractPartitionNumber_MapOutputFile() { + CrunchJobHooks.extractPartitionNumber("out1-m-00000"); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/b34c2f22/crunch/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobTest.java b/crunch/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobTest.java deleted file mode 100644 index 00ad830..0000000 --- a/crunch/src/test/java/org/apache/crunch/impl/mr/exec/CrunchJobTest.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.impl.mr.exec; - -import static org.junit.Assert.assertEquals; - -import org.junit.Test; - -public class CrunchJobTest { - - @Test - public void testExtractPartitionNumber() { - assertEquals(0, CrunchJob.extractPartitionNumber("out1-r-00000")); - assertEquals(10, CrunchJob.extractPartitionNumber("out2-r-00010")); - assertEquals(99999, CrunchJob.extractPartitionNumber("out3-r-99999")); - } - - @Test - public void testExtractPartitionNumber_WithSuffix() { - assertEquals(10, CrunchJob.extractPartitionNumber("out2-r-00010.avro")); - } - - @Test(expected = IllegalArgumentException.class) - public void testExtractPartitionNumber_MapOutputFile() { - CrunchJob.extractPartitionNumber("out1-m-00000"); - } -}
