Repository: crunch Updated Branches: refs/heads/apache-crunch-0.8 643559e3f -> 24b0513ce
CRUNCH-418: Add a logging directory parameter for Crunch DOT plan files. Contributed by Allan Shoup. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/24b0513c Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/24b0513c Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/24b0513c Branch: refs/heads/apache-crunch-0.8 Commit: 24b0513cea9b6493809bada7860e09f3c689e2b4 Parents: 643559e Author: Josh Wills <[email protected]> Authored: Thu Jun 19 19:39:13 2014 -0700 Committer: Josh Wills <[email protected]> Committed: Thu Jun 19 19:40:26 2014 -0700 ---------------------------------------------------------------------- .../it/java/org/apache/crunch/MRPipelineIT.java | 29 +++++++++ .../org/apache/crunch/impl/mr/MRPipeline.java | 67 +++++++++++++++++++- .../crunch/impl/mr/plan/PlanningParameters.java | 7 ++ 3 files changed, 100 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/24b0513c/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java b/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java index 25c85c8..6af3f84 100644 --- a/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java @@ -17,16 +17,22 @@ */ package org.apache.crunch; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.io.File; +import java.io.FileFilter; import java.io.IOException; import java.io.Serializable; +import java.net.URLEncoder; +import com.google.common.io.Files; +import org.apache.commons.io.filefilter.SuffixFileFilter; import org.apache.crunch.PipelineResult.StageResult; import org.apache.crunch.fn.FilterFns; import org.apache.crunch.fn.IdentityFn; import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.impl.mr.plan.PlanningParameters; import org.apache.crunch.io.To; import org.apache.crunch.test.TemporaryPath; import org.apache.crunch.test.TemporaryPaths; @@ -79,5 +85,28 @@ public class MRPipelineIT implements Serializable { assertTrue(new File(outputDirA, "part-r-00000").exists()); assertTrue(new File(outputDirB, "part-r-00000").exists()); } + + @Test + public void testWritingOfDotfile() throws IOException { + File dotfileDir = Files.createTempDir(); + Pipeline pipeline = new MRPipeline(MRPipelineIT.class, tmpDir.getDefaultConfiguration()); + pipeline.getConfiguration().set(PlanningParameters.PIPELINE_DOTFILE_OUTPUT_DIR, dotfileDir.getAbsolutePath()); + + PCollection<String> lines = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt")); + pipeline.write( + lines.parallelDo(IdentityFn.<String>getInstance(), Writables.strings()), + To.textFile(tmpDir.getFile("output").getAbsolutePath())); + pipeline.done(); + File[] files = dotfileDir.listFiles((FileFilter)new SuffixFileFilter(".dot")); + assertEquals(1, files.length); + String fileName = files[0].getName(); + String fileNamePrefix = URLEncoder.encode(pipeline.getName(), "UTF-8"); + fileNamePrefix = (fileNamePrefix.length() < 150) ? fileNamePrefix : fileNamePrefix.substring(0, 150); + assertTrue("DOT file name '" + fileName + "' did not start with the pipeline name '" + fileNamePrefix + "'.", + fileName.startsWith(fileNamePrefix)); + + String regex = ".*_\\d{4}-\\d{2}-\\d{2}_\\d{2}\\.\\d{2}\\.\\d{2}\\.\\d{3}_jobplan\\.dot"; + assertTrue("DOT file name '" + fileName + "' did not match regex '" + regex + "'.", fileName.matches(regex)); + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/24b0513c/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java index 01a3ead..6cfc6d0 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java @@ -18,8 +18,16 @@ package org.apache.crunch.impl.mr; import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URLEncoder; +import java.text.SimpleDateFormat; +import java.util.Date; import java.util.Map; +import com.google.common.base.Charsets; +import com.google.common.collect.Maps; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.crunch.CachingOptions; @@ -32,11 +40,13 @@ import org.apache.crunch.impl.dist.collect.PCollectionImpl; import org.apache.crunch.impl.mr.collect.MRCollectionFactory; import org.apache.crunch.impl.mr.exec.MRExecutor; import org.apache.crunch.impl.mr.plan.MSCRPlanner; +import org.apache.crunch.impl.mr.plan.PlanningParameters; import org.apache.crunch.io.ReadableSource; import org.apache.crunch.materialize.MaterializableIterable; import org.apache.hadoop.conf.Configuration; - -import com.google.common.collect.Maps; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; /** * Pipeline implementation that is executed within Hadoop MapReduce. @@ -120,7 +130,9 @@ public class MRPipeline extends DistributedPipeline { @Override public MRPipelineExecution runAsync() { - MRPipelineExecution res = plan().execute(); + MRExecutor mrExecutor = plan(); + writePlanDotFile(mrExecutor.getPlanDotFile()); + MRPipelineExecution res = mrExecutor.execute(); outputTargets.clear(); return res; } @@ -141,4 +153,53 @@ public class MRPipeline extends DistributedPipeline { // Identical to materialization in a MapReduce context materialize(pcollection); } + + /** + * Writes the MR job plan dot file contents to a timestamped file if the PIPELINE_DOTFILE_OUTPUT_DIR + * config key is set with an output directory. + * + * @param dotFileContents contents to be written to the dot file + */ + private void writePlanDotFile(String dotFileContents) { + String dotFileDir = getConfiguration().get(PlanningParameters.PIPELINE_DOTFILE_OUTPUT_DIR); + if (dotFileDir != null) { + FSDataOutputStream outputStream = null; + Exception thrownException = null; + try { + URI uri = new URI(dotFileDir); + FileSystem fs = FileSystem.get(uri, getConfiguration()); + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd_HH.mm.ss.SSS"); + String filenameSuffix = String.format("_%s_jobplan.dot", dateFormat.format(new Date())); + String encodedName = URLEncoder.encode(getName(), "UTF-8"); + // We limit the pipeline name to the first 150 characters to keep the output dotfile length less + // than 200, as it's not clear what the exact limits are on the filesystem we're writing to (this + // might be HDFS or it might be a local filesystem) + final int maxPipeNameLength = 150; + String filenamePrefix = encodedName.substring(0, Math.min(maxPipeNameLength, encodedName.length())); + Path jobPlanPath = new Path(uri.getPath(), filenamePrefix + filenameSuffix); + LOG.info("Writing jobplan to " + jobPlanPath); + outputStream = fs.create(jobPlanPath, true); + outputStream.write(dotFileContents.getBytes(Charsets.UTF_8)); + } catch (URISyntaxException e) { + thrownException = e; + throw new CrunchRuntimeException("Invalid dot file dir URI, job plan will not be written: " + dotFileDir, e); + } catch (IOException e) { + thrownException = e; + throw new CrunchRuntimeException("Error writing dotfile contents to " + dotFileDir, e); + } catch (RuntimeException e) { + thrownException = e; + throw e; + } finally { + if (outputStream != null) { + try { + outputStream.close(); + } catch (IOException e) { + if (thrownException == null) + throw new CrunchRuntimeException("Error closing dotfile", e); + } + } + } + } + } + } http://git-wip-us.apache.org/repos/asf/crunch/blob/24b0513c/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java index cdfb46f..de89c48 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java @@ -33,6 +33,13 @@ public final class PlanningParameters { */ public static final String PIPELINE_PLAN_DOTFILE = "crunch.planner.dotfile"; + /** + * Configuration key under which a directory URI can be stored where MapReduce pipeline job plans in + * <a href="http://www.graphviz.org">DOT</a> format are stored. The dot files are only written if this configuration + * parameter is set. + */ + public static final String PIPELINE_DOTFILE_OUTPUT_DIR = "crunch.planner.dotfile.outputdir"; + public static final String JOB_NAME_MAX_STACK_LENGTH = "crunch.job.name.max.stack.length"; private PlanningParameters() {
