CRUNCH-438: Visualizations of some important internal/intermediate pipeline planning states
Signed-off-by: tzolov <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/6543ded5 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/6543ded5 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/6543ded5 Branch: refs/heads/master Commit: 6543ded556dbbd4e8e25e59730f7f5fa8e2c3c26 Parents: 3088d82 Author: tzolov <[email protected]> Authored: Thu Jan 8 21:50:37 2015 +0100 Committer: tzolov <[email protected]> Committed: Fri Jan 9 00:35:37 2015 +0100 ---------------------------------------------------------------------- .gitignore | 1 + .../apache/crunch/impl/mr/plan/DotfilesIT.java | 201 +++++++++++++ .../org/apache/crunch/PipelineExecution.java | 7 + .../org/apache/crunch/impl/mem/MemPipeline.java | 10 + .../org/apache/crunch/impl/mr/MRPipeline.java | 10 +- .../apache/crunch/impl/mr/exec/MRExecutor.java | 26 +- .../impl/mr/plan/CommonDotfileWriter.java | 176 +++++++++++ .../crunch/impl/mr/plan/DotfileUtills.java | 178 ++++++++++++ .../crunch/impl/mr/plan/DotfileWriterGraph.java | 156 ++++++++++ .../plan/DotfileWriterPCollectionLineage.java | 87 ++++++ .../impl/mr/plan/DotfileWriterRTNodes.java | 291 +++++++++++++++++++ .../crunch/impl/mr/plan/JobPrototype.java | 2 - .../apache/crunch/impl/mr/plan/MSCRPlanner.java | 43 ++- .../crunch/impl/mr/plan/PlanningParameters.java | 2 + .../crunch/impl/mr/run/CrunchOutputFormat.java | 54 ---- .../org/apache/crunch/impl/mr/run/RTNode.java | 33 +++ .../org/apache/crunch/io/CrunchOutputs.java | 257 ++++------------ .../apache/crunch/impl/spark/SparkRuntime.java | 7 + src/main/config/checkstyle.xml | 1 - 19 files changed, 1258 insertions(+), 284 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/6543ded5/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index d5149ce..d4b1ab9 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,5 @@ target *.iml .idea +.checkstyle http://git-wip-us.apache.org/repos/asf/crunch/blob/6543ded5/crunch-core/src/it/java/org/apache/crunch/impl/mr/plan/DotfilesIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/impl/mr/plan/DotfilesIT.java b/crunch-core/src/it/java/org/apache/crunch/impl/mr/plan/DotfilesIT.java new file mode 100644 index 0000000..bfb1258 --- /dev/null +++ b/crunch-core/src/it/java/org/apache/crunch/impl/mr/plan/DotfilesIT.java @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.plan; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.FilenameFilter; +import java.io.IOException; +import java.nio.charset.Charset; +import java.util.List; + +import org.apache.crunch.DoFn; +import org.apache.crunch.Emitter; +import org.apache.crunch.PCollection; +import org.apache.crunch.PTable; +import org.apache.crunch.Pipeline; +import org.apache.crunch.PipelineResult; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.At; +import org.apache.crunch.lib.Aggregate; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.apache.crunch.types.PTypeFamily; +import org.apache.crunch.types.writable.WritableTypeFamily; +import org.apache.hadoop.conf.Configuration; +import org.junit.Rule; +import org.junit.Test; + +import com.google.common.io.Files; + +public class DotfilesIT { + @Rule + public TemporaryPath tmpDir = TemporaryPaths.create(); + + @Rule + public TemporaryPath dotfileDir = TemporaryPaths.create(); + + enum WordCountStats { + ANDS + } + + public static PTable<String, Long> wordCount(PCollection<String> words, PTypeFamily typeFamily) { + return Aggregate.count(words.parallelDo(new DoFn<String, String>() { + + @Override + public void process(String line, Emitter<String> emitter) { + for (String word : line.split("\\s+")) { + emitter.emit(word); + if ("and".equals(word)) { + increment(WordCountStats.ANDS); + } + } + } + }, typeFamily.strings())); + } + + @Test + public void testPlanDotfileWithOutputDir() throws Throwable { + + Configuration conf = tmpDir.getDefaultConfiguration(); + + DotfileUtills.setPipelineDotfileOutputDir(conf, dotfileDir.getRootFileName()); + + run(new MRPipeline(DotfilesIT.class, conf), WritableTypeFamily.getInstance()); + + String[] dotfileNames = dotfileNames(dotfileDir.getRootFile()); + + assertEquals(1, dotfileNames.length); + + assertTrue(containsFileEndingWith(dotfileNames, "jobplan.dot")); + + assertTrue("PlanDotfile should always be present in the Configuration", + conf.get(PlanningParameters.PIPELINE_PLAN_DOTFILE).length() > 0); + } + + @Test + public void testPlanDotfileWithoutOutputDir() throws Throwable { + + Configuration conf = tmpDir.getDefaultConfiguration(); + + run(new MRPipeline(DotfilesIT.class, conf), WritableTypeFamily.getInstance()); + + String[] dotfileNames = dotfileNames(dotfileDir.getRootFile()); + + assertEquals(0, dotfileNames.length); + assertTrue("PlanDotfile should always be present in the Configuration", + conf.get(PlanningParameters.PIPELINE_PLAN_DOTFILE).length() > 0); + } + + @Test + public void testDebugDotfiles() throws Throwable { + + Configuration conf = tmpDir.getDefaultConfiguration(); + + DotfileUtills.setPipelineDotfileOutputDir(conf, dotfileDir.getRootFileName()); + DotfileUtills.enableDebugDotfiles(conf); + + run(new MRPipeline(DotfilesIT.class, conf), WritableTypeFamily.getInstance()); + + String[] dotfileNames = dotfileNames(dotfileDir.getRootFile()); + + assertEquals(5, dotfileNames.length); + + assertTrue(containsFileEndingWith(dotfileNames, "jobplan.dot")); + assertTrue(containsFileEndingWith(dotfileNames, "split_graph_plan.dot")); + assertTrue(containsFileEndingWith(dotfileNames, "rt_plan.dot")); + assertTrue(containsFileEndingWith(dotfileNames, "base_graph_plan.dot")); + assertTrue(containsFileEndingWith(dotfileNames, "lineage_plan.dot")); + + assertTrue("PlanDotfile should always be present in the Configuration", + conf.get(PlanningParameters.PIPELINE_PLAN_DOTFILE).length() > 0); + } + + @Test + public void testDebugDotfilesEnabledButNoOutputDirSet() throws Throwable { + + Configuration conf = tmpDir.getDefaultConfiguration(); + + DotfileUtills.enableDebugDotfiles(conf); + + run(new MRPipeline(DotfilesIT.class, conf), WritableTypeFamily.getInstance()); + + String[] dotfileNames = dotfileNames(dotfileDir.getRootFile()); + + assertEquals(0, dotfileNames.length); + + assertTrue("PlanDotfile should always be present in the Configuration", + conf.get(PlanningParameters.PIPELINE_PLAN_DOTFILE).length() > 0); + } + + public void run(Pipeline pipeline, PTypeFamily typeFamily) throws IOException { + String inputPath = tmpDir.copyResourceFileName("shakes.txt"); + String outputPath = tmpDir.getFileName("output"); + + PCollection<String> shakespeare = pipeline.read(At.textFile(inputPath, typeFamily.strings())); + PTable<String, Long> wordCount = wordCount(shakespeare, typeFamily); + + pipeline.writeTextFile(wordCount, outputPath); + + PipelineResult res = pipeline.done(); + assertTrue(res.succeeded()); + List<PipelineResult.StageResult> stageResults = res.getStageResults(); + + assertEquals(1, stageResults.size()); + assertEquals(427, stageResults.get(0).getCounterValue(WordCountStats.ANDS)); + + File outputFile = new File(outputPath, "part-r-00000"); + List<String> lines = Files.readLines(outputFile, Charset.defaultCharset()); + boolean passed = false; + for (String line : lines) { + if (line.startsWith("Macbeth\t28") || line.startsWith("[Macbeth,28]")) { + passed = true; + break; + } + } + assertTrue(passed); + } + + private boolean containsFileEndingWith(String[] fileNames, String suffix) { + for (String fn : fileNames) { + if (fn.endsWith(suffix)) + return true; + } + return false; + } + + private String[] dotfileNames(File rootDir) { + + File[] dotfileFiles = rootDir.listFiles(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + return name.endsWith(".dot"); + } + }); + + String[] fileNames = new String[dotfileFiles.length]; + int i = 0; + for (File file : dotfileFiles) { + fileNames[i++] = file.getName(); + } + + return fileNames; + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/6543ded5/crunch-core/src/main/java/org/apache/crunch/PipelineExecution.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/PipelineExecution.java b/crunch-core/src/main/java/org/apache/crunch/PipelineExecution.java index af6a177..b456d45 100644 --- a/crunch-core/src/main/java/org/apache/crunch/PipelineExecution.java +++ b/crunch-core/src/main/java/org/apache/crunch/PipelineExecution.java @@ -19,6 +19,7 @@ package org.apache.crunch; import com.google.common.util.concurrent.ListenableFuture; +import java.util.Map; import java.util.concurrent.TimeUnit; /** @@ -35,6 +36,12 @@ public interface PipelineExecution extends ListenableFuture<PipelineResult> { */ String getPlanDotFile(); + /** + * Returns all .dot files that allows a client to graph the Crunch execution plan internals. + * Key is the name of the dot file and the value is the file itself + */ + Map<String, String> getNamedDotFiles(); + /** Blocks until pipeline completes or the specified waiting time elapsed. */ void waitFor(long timeout, TimeUnit timeUnit) throws InterruptedException; http://git-wip-us.apache.org/repos/asf/crunch/blob/6543ded5/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java index 23b9e04..49e5662 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java @@ -19,12 +19,15 @@ package org.apache.crunch.impl.mem; import java.io.IOException; import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import com.google.common.base.Charsets; + import org.apache.avro.file.DataFileWriter; import org.apache.avro.io.DatumWriter; import org.apache.crunch.CachingOptions; @@ -60,9 +63,11 @@ import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.mapreduce.Counters; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.util.concurrent.AbstractFuture; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -390,6 +395,11 @@ public class MemPipeline implements Pipeline { } @Override + public Map<String, String> getNamedDotFiles() { + return ImmutableMap.of("", ""); + } + + @Override public void waitFor(long timeout, TimeUnit timeUnit) throws InterruptedException { set(res); } http://git-wip-us.apache.org/repos/asf/crunch/blob/6543ded5/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java index d23988b..6cd2809 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java @@ -24,9 +24,11 @@ import java.net.URLEncoder; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Map; +import java.util.Map.Entry; import com.google.common.base.Charsets; import com.google.common.collect.Maps; + import org.apache.crunch.CachingOptions; import org.apache.crunch.CrunchRuntimeException; import org.apache.crunch.PCollection; @@ -130,7 +132,9 @@ public class MRPipeline extends DistributedPipeline { @Override public MRPipelineExecution runAsync() { MRExecutor mrExecutor = plan(); - writePlanDotFile(mrExecutor.getPlanDotFile()); + for (Entry<String, String> dotEntry: mrExecutor.getNamedDotFiles().entrySet()){ + writePlanDotFile(dotEntry.getKey(), dotEntry.getValue()); + } MRPipelineExecution res = mrExecutor.execute(); outputTargets.clear(); return res; @@ -159,7 +163,7 @@ public class MRPipeline extends DistributedPipeline { * * @param dotFileContents contents to be written to the dot file */ - private void writePlanDotFile(String dotFileContents) { + private void writePlanDotFile(String fileName, String dotFileContents) { String dotFileDir = getConfiguration().get(PlanningParameters.PIPELINE_DOTFILE_OUTPUT_DIR); if (dotFileDir != null) { FSDataOutputStream outputStream = null; @@ -168,7 +172,7 @@ public class MRPipeline extends DistributedPipeline { URI uri = new URI(dotFileDir); FileSystem fs = FileSystem.get(uri, getConfiguration()); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd_HH.mm.ss.SSS"); - String filenameSuffix = String.format("_%s_jobplan.dot", dateFormat.format(new Date())); + String filenameSuffix = String.format("_%s_%s.dot", dateFormat.format(new Date()), fileName); String encodedName = URLEncoder.encode(getName(), "UTF-8"); // We limit the pipeline name to the first 150 characters to keep the output dotfile length less // than 200, as it's not clear what the exact limits are on the filesystem we're writing to (this http://git-wip-us.apache.org/repos/asf/crunch/blob/6543ded5/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java index 024fcce..63d0c5d 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java @@ -18,8 +18,10 @@ package org.apache.crunch.impl.mr.exec; import com.google.common.base.Function; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.util.concurrent.AbstractFuture; + import org.apache.crunch.PipelineCallable; import org.apache.crunch.PipelineResult; import org.apache.crunch.SourceTarget; @@ -37,6 +39,7 @@ import org.slf4j.LoggerFactory; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -68,7 +71,7 @@ public class MRExecutor extends AbstractFuture<PipelineResult> implements MRPipe private Thread monitorThread; private boolean started; - private String planDotFile; + private Map<String, String> namedDotFiles; public MRExecutor( Configuration conf, @@ -90,16 +93,28 @@ public class MRExecutor extends AbstractFuture<PipelineResult> implements MRPipe this.pollInterval = isLocalMode() ? new CappedExponentialCounter(50, 1000) : new CappedExponentialCounter(500, 10000); + + this.namedDotFiles = new ConcurrentHashMap<String, String>(); } public void addJob(CrunchControlledJob job) { this.control.addJob(job); } - public void setPlanDotFile(String planDotFile) { - this.planDotFile = planDotFile; + public void addNamedDotFile(String fileName, String planDotFile) { + this.namedDotFiles.put(fileName, planDotFile); + } + + @Override + public String getPlanDotFile() { + return this.namedDotFiles.get("jobplan"); } + @Override + public Map<String, String> getNamedDotFiles() { + return ImmutableMap.copyOf(this.namedDotFiles); + } + public synchronized MRPipelineExecution execute() { if (!started) { monitorThread.start(); @@ -190,11 +205,6 @@ public class MRExecutor extends AbstractFuture<PipelineResult> implements MRPipe } @Override - public String getPlanDotFile() { - return planDotFile; - } - - @Override public void waitFor(long timeout, TimeUnit timeUnit) throws InterruptedException { doneSignal.await(timeout, timeUnit); } http://git-wip-us.apache.org/repos/asf/crunch/blob/6543ded5/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/CommonDotfileWriter.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/CommonDotfileWriter.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/CommonDotfileWriter.java new file mode 100644 index 0000000..a3199ba --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/CommonDotfileWriter.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.plan; + +import static java.lang.String.format; +import static org.apache.commons.collections.CollectionUtils.isEmpty; + +import java.util.ArrayList; + +import org.apache.crunch.Source; +import org.apache.crunch.Target; +import org.apache.crunch.impl.dist.collect.PCollectionImpl; +import org.apache.crunch.types.PType; + +import com.google.common.base.Joiner; +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; + +/** + * Common Debug dotfile writer class. Provides the draw abstraction common for all debug dotfile writers. + */ +@SuppressWarnings({ "rawtypes" }) +public abstract class CommonDotfileWriter { + + protected static final String DEFAULT_FOLDER_COLOR = "darkGreen"; + + protected static final String[] COLORS = new String[] { "blue", "red", "green", "yellow", "cyan", "darkGray", "gray", + "magenta", "darkGreen", "black" }; + + protected StringBuilder edgeBuilder = null; + protected StringBuilder contentBuilder = null; + + protected String label(String text) { + return text == null ? "-" : text; + } + + protected String className(Object obj) { + + if (obj == null) { + return "-"; + } + + Class clazz = null; + if (obj instanceof Class) { + clazz = (Class) obj; + } else { + clazz = obj.getClass(); + } + String s = clazz.getName(); + s = s.substring(s.lastIndexOf('.') + 1); + + return s; + } + + protected String getPCollectionID(PCollectionImpl<?> pcollectionImpl) { + return String.format("\"%s@%d\"", pcollectionImpl.getName(), pcollectionImpl.hashCode()); + } + + protected String formatPCollection(PCollectionImpl<?> pcollectionImpl) { + + String withBreakpoint = pcollectionImpl.isBreakpoint() ? " [breakpoint]" : ""; + + return String.format("%s [label=\"{%s | %s | %s }\", shape=%s, color=%s];\n", getPCollectionID(pcollectionImpl), + pcollectionImpl.getName(), className(pcollectionImpl) + withBreakpoint, + formatPType(pcollectionImpl.getPType()), "record", "black"); + } + + protected String formatPType(PType ptype) { + + StringBuilder sb = new StringBuilder(); + + sb.append(className(ptype.getTypeClass())); + + if (!isEmpty(ptype.getSubTypes())) { + + ArrayList<String> subtypes = Lists.newArrayList(); + for (Object subType : ptype.getSubTypes()) { + if (subType instanceof PType) { + subtypes.add(formatPType((PType) subType)); + } else { + subtypes.add(className(subType)); + } + } + + sb.append("[").append(Joiner.on(", ").join(subtypes)).append("]"); + } + + return sb.toString(); + } + + private String getSourceID(Source s) { + return "\"ST@" + s + "\""; + } + + private String getTargetID(Target t) { + return "\"ST@" + t + "\""; + } + + protected void formatTarget(Target target, String color) { + contentBuilder.append(String.format("%s [label=\"%s\", shape=folder, color=\"%s\"];\n", getTargetID(target), + target.toString(), color)); + } + + protected void formatSource(Source source, String color) { + contentBuilder.append(String.format("%s [label=\"%s\", shape=folder, color=\"%s\"];\n", getSourceID(source), + source.toString(), color)); + } + + protected void link(String from, String to, String color) { + edgeBuilder.append(String.format("%s -> %s [color=\"%s\"];\n", from, to, color)); + } + + protected void link(PCollectionImpl pc, Target target, String color) { + link(getPCollectionID(pc), getTargetID(target), color); + } + + protected void link(PCollectionImpl parent, PCollectionImpl child, String color) { + link(getPCollectionID(parent), getPCollectionID(child), color); + } + + protected void link(Source source, PCollectionImpl pc, String color) { + link(getSourceID(source), getPCollectionID(pc), color); + } + + public String buildDiagram(String diagramName) { + + edgeBuilder = new StringBuilder(); + contentBuilder = new StringBuilder(); + + contentBuilder.append("digraph G {\n"); + contentBuilder.append(format(" label=\"%s \\n\\n\"; fontsize=24; labelloc=\"t\"; \n", diagramName)); + + contentBuilder.append(getLgentd()); + + try { + doBuildDiagram(); + } catch (Throwable t) { + contentBuilder.append("\"" + Throwables.getRootCause(t) + "\""); + } + + contentBuilder.append(edgeBuilder); + contentBuilder.append("}\n"); + + return contentBuilder.toString(); + } + + public String getLgentd() { + StringBuilder lsb = new StringBuilder(); + lsb.append("subgraph \"cluster-legend-rtnodes\" {\n").append( + "label=\"LEGEND\" ; fontsize=10; style=filled; color=lightblue;\n"); + + doGetLegend(lsb); + + lsb.append("}\n"); + return lsb.toString(); + } + + protected abstract void doBuildDiagram(); + + protected abstract void doGetLegend(StringBuilder lsb); +} http://git-wip-us.apache.org/repos/asf/crunch/blob/6543ded5/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileUtills.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileUtills.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileUtills.java new file mode 100644 index 0000000..dc372f5 --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileUtills.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.plan; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.crunch.Target; +import org.apache.crunch.impl.dist.collect.PCollectionImpl; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.impl.mr.exec.MRExecutor; +import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; + +/** + * Helper class that manages the dotfile generation lifecycle and configuring the dotfile debug context. + */ +public class DotfileUtills { + + private static final Logger LOG = LoggerFactory.getLogger(DotfileUtills.class); + + private final Class<?> jarClass; + private final Configuration conf; + + private String rtNodesDotfile = ""; + private String basePlanGraphDotfile = ""; + private String splitGraphPlanDotfile = ""; + private String pcollectionLineageDotfile = ""; + private String planDotFile = ""; + + DotfileUtills(Class<?> jarClass, Configuration conf) { + this.jarClass = jarClass; + this.conf = conf; + } + + /** + * Builds a lineage dotfile only if the dotfile-debug mode is enabled. + */ + void buildLineageDotfile(Map<PCollectionImpl<?>, Set<Target>> outputs) { + + if (isDebugDotfilesEnabled(conf)) { + try { + pcollectionLineageDotfile = new DotfileWriterPCollectionLineage(outputs) + .buildDiagram("PCollection Lineage Plan (" + jarClass.getSimpleName() + ")"); + } catch (Exception ex) { + LOG.error("Problem creating debug dotfile:", ex); + } + } + } + + /** + * Builds the base graph dotfile only if the dotfile-debug mode is enabled. + */ + void buildBaseGraphDotfile(Map<PCollectionImpl<?>, Set<Target>> outputs, Graph graph) { + + if (isDebugDotfilesEnabled(conf)) { + try { + basePlanGraphDotfile = new DotfileWriterGraph(graph, outputs, null).buildDiagram("Base Graph (" + + jarClass.getSimpleName() + ")"); + } catch (Exception ex) { + LOG.error("Problem creating debug dotfile:", ex); + } + } + } + + /** + * Builds a splitted graph dotfile only if the dotfile-debug mode is enabled. + */ + void buildSplitGraphDotfile(Map<PCollectionImpl<?>, Set<Target>> outputs, Graph graph, List<List<Vertex>> components) { + + if (isDebugDotfilesEnabled(conf)) { + try { + splitGraphPlanDotfile = new DotfileWriterGraph(graph, outputs, null).buildDiagram("Graph With Components (" + + jarClass.getSimpleName() + ")"); + + } catch (Exception ex) { + LOG.error("Problem creating debug dotfile:", ex); + } + } + } + + /** + * Builds a RT node dotfile only if the dotfile-debug mode is enabled. + */ + void buildRTNodesDotfile(MRExecutor exec) { + if (isDebugDotfilesEnabled(conf)) { + try { + rtNodesDotfile = new DotfileWriterRTNodes(exec.getJobs()).buildDiagram("Run Time Plan (" + + jarClass.getSimpleName() + ")"); + } catch (Exception ex) { + LOG.error("Problem creating debug dotfile:", ex); + } + } + } + + /** + * Build the plan dotfile despite of the the dotfile-debug mode. + * + * @throws IOException + */ + void buildPlanDotfile(MRExecutor exec, Multimap<Target, JobPrototype> assignments, MRPipeline pipeline, int lastJobID) { + try { + DotfileWriter dotfileWriter = new DotfileWriter(); + + for (JobPrototype proto : Sets.newHashSet(assignments.values())) { + dotfileWriter.addJobPrototype(proto); + } + + planDotFile = dotfileWriter.buildDotfile(); + + } catch (Exception ex) { + LOG.error("Problem creating debug dotfile:", ex); + } + } + + /** + * Attach the generated dotfiles to the {@link MRExecutor} context!. Note that the planDotFile is always added! + */ + void addDotfilesToContext(MRExecutor exec) { + try { + // The job plan is always enabled and set in the Configuration; + conf.set(PlanningParameters.PIPELINE_PLAN_DOTFILE, planDotFile); + exec.addNamedDotFile("jobplan", planDotFile); + + // Debug dotfiles are only stored if the configuration is set to enabled + if (isDebugDotfilesEnabled(conf)) { + exec.addNamedDotFile("rt_plan", rtNodesDotfile); + exec.addNamedDotFile("base_graph_plan", basePlanGraphDotfile); + exec.addNamedDotFile("split_graph_plan", splitGraphPlanDotfile); + exec.addNamedDotFile("lineage_plan", pcollectionLineageDotfile); + } + } catch (Exception ex) { + LOG.error("Problem creating debug dotfile:", ex); + } + } + + public static boolean isDebugDotfilesEnabled(Configuration conf) { + return conf.getBoolean(PlanningParameters.DEBUG_DOTFILES_ENABLED, false) + && conf.get(PlanningParameters.PIPELINE_DOTFILE_OUTPUT_DIR) != null; + } + + public static void enableDebugDotfiles(Configuration conf) { + conf.setBoolean(PlanningParameters.DEBUG_DOTFILES_ENABLED, true); + } + + public static void disableDebugDotfilesEnabled(Configuration conf) { + conf.setBoolean(PlanningParameters.DEBUG_DOTFILES_ENABLED, false); + } + + public static void setPipelineDotfileOutputDir(Configuration conf, String outputDir) { + conf.set(PlanningParameters.PIPELINE_DOTFILE_OUTPUT_DIR, outputDir); + } + + public static String getPipelineDotfileOutputDir(Configuration conf) { + return conf.get(PlanningParameters.PIPELINE_DOTFILE_OUTPUT_DIR); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/6543ded5/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriterGraph.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriterGraph.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriterGraph.java new file mode 100644 index 0000000..387f4bb --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriterGraph.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.plan; + +import static java.lang.String.format; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.crunch.Source; +import org.apache.crunch.Target; +import org.apache.crunch.impl.dist.collect.PCollectionImpl; + +import com.google.common.base.Joiner; +import com.google.common.collect.Maps; + +@SuppressWarnings("rawtypes") +public class DotfileWriterGraph extends CommonDotfileWriter { + + private Graph graph; + private Map<PCollectionImpl<?>, Set<Target>> outputTargets; + private List<List<Vertex>> components; + + public DotfileWriterGraph(Graph graph, Map<PCollectionImpl<?>, Set<Target>> outputTargets, + List<List<Vertex>> components) { + super(); + this.graph = graph; + this.outputTargets = outputTargets; + this.components = components; + } + + private String formatVertex(Vertex v) { + return formatPCollection(v.getPCollection()); + } + + private String formatNodePaths(Set<NodePath> nodePaths) { + ArrayList<String> path = new ArrayList<String>(); + for (NodePath np : nodePaths) { + path.add(Joiner.on(", \\l").join(np) + " \\l"); + } + return format("%s", Joiner.on(" | \\n").join(path)); + } + + private void link(Edge e) { + edgeBuilder.append(String.format("%s -> %s [label=\"%s\", labeljust=r, color=\"%s\"];\n", getPCollectionID(e.getHead() + .getPCollection()), getPCollectionID(e.getTail().getPCollection()), formatNodePaths(e.getNodePaths()), "black")); + } + + private void link(Source source, Vertex v, String color) { + link(source, v.getPCollection(), color); + } + + private void link(Vertex v, Target target, String color) { + link(v.getPCollection(), target, color); + } + + private class ComponentContentBuilder { + + private Map<List<Vertex>, StringBuilder> contentBuilderMap = Maps.newHashMap(); + private StringBuilder topContentBuilder; + + public ComponentContentBuilder(StringBuilder contentBuilder, List<List<Vertex>> components) { + this.topContentBuilder = contentBuilder; + if (!CollectionUtils.isEmpty(components)) { + for (List<Vertex> vl : components) { + contentBuilderMap.put(vl, new StringBuilder()); + } + } + } + + private StringBuilder getContentBuilder(Vertex v) { + for (Entry<List<Vertex>, StringBuilder> entry : contentBuilderMap.entrySet()) { + if (entry.getKey().contains(v)) { + return entry.getValue(); + } + } + return topContentBuilder; + } + + public void append(Vertex v) { + this.getContentBuilder(v).append(formatVertex(v)); + } + + public StringBuilder build() { + int index = 0; + for (Entry<List<Vertex>, StringBuilder> entry : contentBuilderMap.entrySet()) { + topContentBuilder.append("subgraph \"cluster-component" + index + "\" {\n"); + topContentBuilder.append(format( + " label=\"Component%s\"; fontsize=14; graph[style=dotted]; fontcolor=red color=red; \n", index)); + topContentBuilder.append(entry.getValue()); + topContentBuilder.append("}\n"); + index++; + } + return topContentBuilder; + } + } + + @Override + protected void doGetLegend(StringBuilder lsb) { + lsb.append(" \"Folder\" [label=\"Folder Name\", fontsize=10, shape=folder, color=darkGreen]\n") + .append(" \"Vertex1\" [label=\"{Vertex Name | Vertex PCollection | PType }\", fontsize=10, shape=record]\n") + .append(" subgraph \"cluster-component-legend\" {\n") + .append(" label=\"Component1\" fontsize=14 graph[style=dotted] fontcolor=red color=red\n") + .append( + " \"Vertex2\" [label=\"{Vertex Name | Vertex PCollection | PType }\", fontsize=10, shape=record]\n") + .append(" }\n").append(" \"Vertex1\" -> \"Vertex2\" [label=\"Path List\", fontsize=10];\n"); + } + + @Override + public void doBuildDiagram() { + + ComponentContentBuilder componentContentBuilder = new ComponentContentBuilder(contentBuilder, components); + + for (Vertex v : graph) { + componentContentBuilder.append(v); + + Source source = v.getSource(); + if (source != null) { + formatSource(source, DEFAULT_FOLDER_COLOR); + link(source, v, DEFAULT_FOLDER_COLOR); + } + + if (v.isOutput() && outputTargets != null) { + for (Target target2 : outputTargets.get(v.getPCollection())) { + formatTarget(target2, DEFAULT_FOLDER_COLOR); + link(v, target2, DEFAULT_FOLDER_COLOR); + } + } + } + + contentBuilder = componentContentBuilder.build(); + + for (Edge e : graph.getAllEdges()) { + link(e); + } + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/6543ded5/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriterPCollectionLineage.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriterPCollectionLineage.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriterPCollectionLineage.java new file mode 100644 index 0000000..9d54de9 --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriterPCollectionLineage.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.plan; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.crunch.Target; +import org.apache.crunch.impl.dist.collect.PCollectionImpl; +import org.apache.crunch.impl.mr.collect.InputCollection; + +/** + * Writes <a href="http://www.graphviz.org">Graphviz</a> dot files to illustrate the topology of Crunch pipelines. + */ +@SuppressWarnings({ "rawtypes", "unchecked" }) +public class DotfileWriterPCollectionLineage extends CommonDotfileWriter { + + private Map<PCollectionImpl<?>, Set<Target>> outputs; + + public DotfileWriterPCollectionLineage(Map<PCollectionImpl<?>, Set<Target>> outputs) { + super(); + this.outputs = outputs; + } + + private void formatPCollectionLineage(PCollectionImpl pcollection, String color) { + + contentBuilder.append(formatPCollection(pcollection)); + + // for input pcollections add the related source and link it to the collection + if (pcollection instanceof InputCollection) { + InputCollection ic = (InputCollection) pcollection; + + formatSource(ic.getSource(), DEFAULT_FOLDER_COLOR); + + link(ic.getSource(), pcollection, color); + } + + List<PCollectionImpl<?>> parents = pcollection.getParents(); + if (!CollectionUtils.isEmpty(parents)) { + for (PCollectionImpl parentPCollection : parents) { + link(parentPCollection, pcollection, color); + formatPCollectionLineage(parentPCollection, color); + } + } + } + + @Override + protected void doBuildDiagram() { + + int outputIndex = 0; + + for (PCollectionImpl<?> pcollection : outputs.keySet()) { + + String pathColor = COLORS[outputIndex++]; + + formatPCollectionLineage(pcollection, pathColor); + + for (Target target : outputs.get(pcollection)) { + formatTarget(target, DEFAULT_FOLDER_COLOR); + link(pcollection, target, pathColor); + } + } + } + + @Override + protected void doGetLegend(StringBuilder lsb) { + lsb.append("\"Folder\" [label=\"Folder Name\" fontsize=10 shape=folder color=darkGreen]\n").append( + "\"PCollection\" [label=\"{PCollection Name | PCollection Class| PType }\" fontsize=10 shape=record]\n"); + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/6543ded5/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriterRTNodes.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriterRTNodes.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriterRTNodes.java new file mode 100644 index 0000000..58bb95d --- /dev/null +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriterRTNodes.java @@ -0,0 +1,291 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mr.plan; + +import static java.lang.String.format; +import static org.apache.commons.collections.CollectionUtils.isEmpty; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.commons.lang.StringUtils; +import org.apache.crunch.CrunchRuntimeException; +import org.apache.crunch.impl.mr.MRJob; +import org.apache.crunch.impl.mr.run.NodeContext; +import org.apache.crunch.impl.mr.run.RTNode; +import org.apache.crunch.io.CrunchInputs; +import org.apache.crunch.io.CrunchOutputs; +import org.apache.crunch.io.CrunchOutputs.OutputConfig; +import org.apache.crunch.io.FormatBundle; +import org.apache.crunch.types.Converter; +import org.apache.crunch.util.DistCache; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; + +/** + * Writes <a href="http://www.graphviz.org">Graphviz</a> dot files to illustrate the topology of Crunch pipelines. + */ +@SuppressWarnings({ "rawtypes", "unchecked" }) +public class DotfileWriterRTNodes extends CommonDotfileWriter { + + private static final String GREEN = "green"; + private static final String RED = "red"; + private static final String CYAN = "cyan"; + private static final String BLUE = "blue"; + private static final String BLACK = "black"; + + private List<MRJob> mrJobs; + + public DotfileWriterRTNodes(List<MRJob> mrJobs) { + super(); + this.mrJobs = mrJobs; + } + + private String getId(RTNode rtNode) { + return format("\"%s@%d\"", rtNode.getNodeName(), rtNode.hashCode()); + } + + private String getOutputNameId(String outputName, MRJob mrJob) { + return format("\"%s@%s\"", outputName, mrJob.getJobID()); + } + + private String getId(FormatBundle bundle, MRJob mrJob) { + String name = (bundle == null) ? "-" : bundle.getName(); + return format("\"%s@%s\"", name, mrJob.getJobID()); + } + + private String formatConvertor(Converter converter) { + StringBuffer sb = new StringBuffer(); + sb.append(className(converter)); + if (converter != null) { + if (!converter.applyPTypeTransforms()) { + sb.append(" (applyPTypeTransforms = ").append(converter.applyPTypeTransforms()).append(")"); + } + sb.append("[").append(converter.getKeyClass().getSimpleName()).append(", ") + .append(converter.getValueClass().getSimpleName()).append("]"); + } + return sb.toString(); + } + + private String formatRTNode(RTNode rtNode) { + return format("%s [label=\"{{%s | %s} | %s | %s | { %s | %s } }\" shape=record; color = black;];\n", getId(rtNode), + label(rtNode.getNodeName()), label(rtNode.getOutputName()), className(rtNode.getDoFn()), + formatPType(rtNode.getPType()), formatConvertor(rtNode.getInputConverter()), + formatConvertor(rtNode.getOutputConverter())); + } + + private void formatRTNodeTree(RTNode parentRTNode) { + + contentBuilder.append(formatRTNode(parentRTNode)); + + if (!isEmpty(parentRTNode.getChildren())) { + for (RTNode child : parentRTNode.getChildren()) { + // process child nodes + formatRTNodeTree(child); + // link parent to child node + link(getId(parentRTNode), getId(child), BLACK); + } + } + } + + private List<RTNode> formatMRJobTask(Configuration configuration, int jobId, NodeContext nodeContext, String color) { + + List<RTNode> rtNodes = getRTNodes(configuration, nodeContext); + if (rtNodes == null) + return null; + + contentBuilder.append("subgraph \"cluster-job" + jobId + "_" + nodeContext + "\" {\n"); + contentBuilder.append(" label=\"" + nodeContext + "\"; color=" + color + "; fontsize=14;\n"); + + for (RTNode rtn : rtNodes) { + formatRTNodeTree(rtn); + } + contentBuilder.append("}\n"); + + return rtNodes; + } + + private void formatJobOutputs(Map<String, OutputConfig> namedOutputs, MRJob mrJob) { + + contentBuilder.append("subgraph \"cluster-output_" + mrJob.getJobID() + "\" {\n"); + contentBuilder.append(" label=\"OUTPUTS\"; fontsize=14; color= magenta;\n"); + + for (Entry<String, OutputConfig> entry : namedOutputs.entrySet()) { + String output = format("%s [label=\"{%s | %s | { %s | %s } }\" shape=record; color = %s];\n", + getOutputNameId(entry.getKey(), mrJob), entry.getKey(), entry.getValue().bundle.getName(), + entry.getValue().keyClass.getSimpleName(), entry.getValue().valueClass.getSimpleName(), BLACK); + + contentBuilder.append(output); + } + + contentBuilder.append("}\n"); + } + + private void formatJobInputs(Map<FormatBundle, Map<Integer, List<Path>>> inputFormatNodeMap, MRJob mrJob, String color) { + + contentBuilder.append("subgraph \"cluster-inputs_" + mrJob.getJobID() + "\" {\n"); + contentBuilder.append(" label=\"INPUTS\"; fontsize=14; color= " + color + ";\n"); + + for (Entry<FormatBundle, Map<Integer, List<Path>>> entry : inputFormatNodeMap.entrySet()) { + + FormatBundle bundle = entry.getKey(); + + ArrayList<String> inList = new ArrayList<String>(); + for (Entry<Integer, List<Path>> value : entry.getValue().entrySet()) { + inList.add(format("{ %s | %s}", value.getKey(), value.getValue())); + } + + contentBuilder.append(format("%s [label=\"{ %s | %s}\" shape=record; color = %s];\n", getId(bundle, mrJob), + bundle.getName(), Joiner.on("|").join(inList), BLACK)); + } + + contentBuilder.append("}\n"); + } + + private FormatBundle findFormatBundleByNodeIndex(Map<FormatBundle, Map<Integer, List<Path>>> inputFormatNodeMap, + int nodeIndex) { + for (Entry<FormatBundle, Map<Integer, List<Path>>> entry : inputFormatNodeMap.entrySet()) { + if (entry.getValue().containsKey(nodeIndex)) { + return entry.getKey(); + } + if (nodeIndex == 0 && entry.getValue().containsKey(-1)) { + return entry.getKey(); + } + } + return null; + } + + private List<RTNode> leafs(List<RTNode> rtNodes) { + + ArrayList<RTNode> tails = Lists.newArrayListWithExpectedSize(rtNodes.size()); + + for (RTNode node : rtNodes) { + tails.addAll(leafs(node)); + } + return tails; + } + + private List<RTNode> leafs(RTNode rtNode) { + + List<RTNode> leafs = Lists.newArrayList(); + + if (rtNode.isLeafNode()) { + leafs.add(rtNode); + } else { + for (RTNode child : rtNode.getChildren()) { + leafs.addAll(leafs(child)); + } + } + + return leafs; + } + + private static List<RTNode> getRTNodes(Configuration conf, NodeContext nodeContext) { + Path path = new Path(new Path(conf.get(PlanningParameters.CRUNCH_WORKING_DIRECTORY)), nodeContext.toString()); + try { + return (List<RTNode>) DistCache.read(conf, path); + } catch (IOException e) { + throw new CrunchRuntimeException("Could not read runtime node information", e); + } + } + + @Override + protected void doBuildDiagram() { + + for (MRJob mrJob : mrJobs) { + + // TODO to find a way to handle job dependencies e.g mrJob.getDependentJobs() + + Configuration configuration = mrJob.getJob().getConfiguration(); + + contentBuilder.append("subgraph \"cluster-job" + mrJob.getJobID() + "\" {\n"); + contentBuilder.append(" label=\"Crunch Job " + mrJob.getJobID() + "\" ;\n"); + + List<RTNode> mapRTNodes = formatMRJobTask(configuration, mrJob.getJobID(), NodeContext.MAP, BLUE); + List<RTNode> combineRTNodes = formatMRJobTask(configuration, mrJob.getJobID(), NodeContext.COMBINE, CYAN); + List<RTNode> reduceRTNodes = formatMRJobTask(configuration, mrJob.getJobID(), NodeContext.REDUCE, RED); + + // Deserialize Job's inputs from the CRUNCH_INPUTS Configuration property. + Map<FormatBundle, Map<Integer, List<Path>>> inputFormatNodeMap = CrunchInputs.getFormatNodeMap(mrJob.getJob()); + + formatJobInputs(inputFormatNodeMap, mrJob, GREEN); + + // Link inputs to map RTNode tasks + for (int mapNodeIndex = 0; mapNodeIndex < mapRTNodes.size(); mapNodeIndex++) { + FormatBundle formatBundle = findFormatBundleByNodeIndex(inputFormatNodeMap, mapNodeIndex); + RTNode rtNode = mapRTNodes.get(mapNodeIndex); + link(getId(formatBundle, mrJob), getId(rtNode), BLACK); + } + + // Deserialize Job's Outputs from the CRUNCH_OUTPUTS Configuration property. + Map<String, OutputConfig> namedOutputs = CrunchOutputs.getNamedOutputs(configuration); + + formatJobOutputs(namedOutputs, mrJob); + + List<RTNode> mapLeafs = leafs(mapRTNodes); + + for (RTNode leafNode : mapLeafs) { + String outputName = leafNode.getOutputName(); + if (StringUtils.isEmpty(outputName)) { + if (!isEmpty(combineRTNodes)) { + // If there is a combiner connect the map to the combiner and then the combiner to the reducer + link(getId(leafNode), getId(combineRTNodes.get(0)), BLACK); + link(getId(leafs(combineRTNodes).get(0)), getId(reduceRTNodes.get(0)), BLACK); + } else { + // connect + link(getId(leafNode), getId(reduceRTNodes.get(0)), BLACK); + } + } else { + link(getId(leafNode), getOutputNameId(outputName, mrJob), BLACK); + } + } + + if (!isEmpty(reduceRTNodes)) { + List<RTNode> reduceTails = leafs(reduceRTNodes); + for (RTNode tailNode : reduceTails) { + String outputName = tailNode.getOutputName(); + if (StringUtils.isEmpty(outputName)) { + throw new RuntimeException("Recue output RTNode with no named output! :" + tailNode); + } else { + link(getId(tailNode), getOutputNameId(outputName, mrJob), BLACK); + } + } + } + + contentBuilder.append("}\n"); + + } + } + + @Override + protected void doGetLegend(StringBuilder lsb) { + lsb.append( + "\"RTNodes\" [label=\"{{RTNode Name | Output Name } | DoFn | PType | { Input Converter | Output Converter}}\"; shape=record;]\n") + .append("\"Inputs\" [label=\"{InputFormat Name | {Node Index | Path List}}\"; shape=record; color = green]\n") + .append( + "\"Outputs\" [label=\"{Output Name | OutputFormat Name |{Key Class | Value Class}}\"; shape=record; color = magenta]\n") + .append("\"Inputs\" -> \"RTNodes\" [style=invis];\n").append("\"RTNodes\" -> \"Outputs\" [style=invis];\n"); + + } +} http://git-wip-us.apache.org/repos/asf/crunch/blob/6543ded5/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java index 2863e00..d341184 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java @@ -34,7 +34,6 @@ import org.apache.crunch.impl.mr.exec.CrunchJobHooks; import org.apache.crunch.impl.mr.run.CrunchCombiner; import org.apache.crunch.impl.mr.run.CrunchInputFormat; import org.apache.crunch.impl.mr.run.CrunchMapper; -import org.apache.crunch.impl.mr.run.CrunchOutputFormat; import org.apache.crunch.impl.mr.run.CrunchReducer; import org.apache.crunch.impl.mr.run.NodeContext; import org.apache.crunch.impl.mr.run.RTNode; @@ -215,7 +214,6 @@ class JobPrototype { job.setNumReduceTasks(0); inputNodes = Lists.newArrayList(outputNodes); } - job.setOutputFormatClass(CrunchOutputFormat.class); serialize(inputNodes, conf, workingPath, NodeContext.MAP); if (inputNodes.size() == 1) { http://git-wip-us.apache.org/repos/asf/crunch/blob/6543ded5/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java index 91e3036..b470586 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java @@ -36,15 +36,15 @@ import org.apache.crunch.impl.mr.collect.PGroupedTableImpl; import org.apache.crunch.impl.mr.exec.MRExecutor; import org.apache.crunch.materialize.MaterializableIterable; import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; -import com.google.common.collect.ImmutableMultimap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class MSCRPlanner { @@ -86,19 +86,26 @@ public class MSCRPlanner { }; public MRExecutor plan(Class<?> jarClass, Configuration conf) throws IOException { + + DotfileUtills dotfileUtills = new DotfileUtills(jarClass, conf); + + // Generate the debug lineage dotfiles (if configuration is enabled) + dotfileUtills.buildLineageDotfile(outputs); + Map<PCollectionImpl<?>, Set<Target>> targetDeps = Maps.newTreeMap(DEPTH_COMPARATOR); for (PCollectionImpl<?> pcollect : outputs.keySet()) { targetDeps.put(pcollect, pcollect.getTargetDependencies()); } - + Multimap<Target, JobPrototype> assignments = HashMultimap.create(); + while (!targetDeps.isEmpty()) { Set<Target> allTargets = Sets.newHashSet(); for (PCollectionImpl<?> pcollect : targetDeps.keySet()) { allTargets.addAll(outputs.get(pcollect)); } GraphBuilder graphBuilder = new GraphBuilder(); - + // Walk the current plan tree and build a graph in which the vertices are // sources, targets, and GBK operations. Set<PCollectionImpl<?>> currentStage = Sets.newHashSet(); @@ -109,7 +116,7 @@ public class MSCRPlanner { currentStage.add(output); } } - + Graph baseGraph = graphBuilder.getGraph(); boolean hasInputs = false; for (Vertex v : baseGraph) { @@ -125,10 +132,14 @@ public class MSCRPlanner { // Create a new graph that splits up up dependent GBK nodes. Graph graph = prepareFinalGraph(baseGraph); - + // Break the graph up into connected components. List<List<Vertex>> components = graph.connectedComponents(); - + + // Generate the debug graph dotfiles (if configuration is enabled) + dotfileUtills.buildBaseGraphDotfile(outputs, graph); + dotfileUtills.buildSplitGraphDotfile(outputs, graph, components); + // For each component, we will create one or more job prototypes, // depending on its profile. // For dependency handling, we only need to care about which @@ -191,18 +202,22 @@ public class MSCRPlanner { targetDeps.remove(output); } } - + // Finally, construct the jobs from the prototypes and return. - DotfileWriter dotfileWriter = new DotfileWriter(); MRExecutor exec = new MRExecutor(conf, jarClass, outputs, toMaterialize, appendedTargets, pipelineCallables); + + // Generate the debug Plan dotfiles + dotfileUtills.buildPlanDotfile(exec, assignments, pipeline, lastJobID); + for (JobPrototype proto : Sets.newHashSet(assignments.values())) { - dotfileWriter.addJobPrototype(proto); exec.addJob(proto.getCrunchJob(jarClass, conf, pipeline, lastJobID)); } - String planDotFile = dotfileWriter.buildDotfile(); - exec.setPlanDotFile(planDotFile); - conf.set(PlanningParameters.PIPELINE_PLAN_DOTFILE, planDotFile); + // Generate the debug RTNode dotfiles (if configuration is enabled) + dotfileUtills.buildRTNodesDotfile(exec); + + // Attach the dotfiles to the MRExcutor context + dotfileUtills.addDotfilesToContext(exec); return exec; } http://git-wip-us.apache.org/repos/asf/crunch/blob/6543ded5/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java index de89c48..a40abf9 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/PlanningParameters.java @@ -33,6 +33,8 @@ public final class PlanningParameters { */ public static final String PIPELINE_PLAN_DOTFILE = "crunch.planner.dotfile"; + public static final String DEBUG_DOTFILES_ENABLED = "crunch.internals.dotfiles"; + /** * Configuration key under which a directory URI can be stored where MapReduce pipeline job plans in * <a href="http://www.graphviz.org">DOT</a> format are stored. The dot files are only written if this configuration http://git-wip-us.apache.org/repos/asf/crunch/blob/6543ded5/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchOutputFormat.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchOutputFormat.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchOutputFormat.java deleted file mode 100644 index bd9cdc9..0000000 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/CrunchOutputFormat.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.crunch.impl.mr.run; - -import org.apache.crunch.io.CrunchOutputs; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.OutputCommitter; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -import java.io.IOException; - -public class CrunchOutputFormat<K, V> extends OutputFormat<K, V> { - @Override - public RecordWriter<K, V> getRecordWriter(TaskAttemptContext taskAttemptContext) - throws IOException, InterruptedException { - return new RecordWriter<K, V>() { - @Override - public void write(K k, V v) throws IOException, InterruptedException { - } - - @Override - public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { - } - }; - } - - @Override - public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException { - CrunchOutputs.checkOutputSpecs(jobContext); - } - - @Override - public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext) - throws IOException, InterruptedException { - return CrunchOutputs.getOutputCommitter(taskAttemptContext); - } -} http://git-wip-us.apache.org/repos/asf/crunch/blob/6543ded5/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java index 650d0c9..37bf5b6 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java @@ -126,4 +126,37 @@ public class RTNode implements Serializable { return "RTNode [nodeName=" + nodeName + ", fn=" + fn + ", children=" + children + ", inputConverter=" + inputConverter + ", outputConverter=" + outputConverter + ", outputName=" + outputName + "]"; } + + // Attributes needed to plot the dotfile diagrams + public String getNodeName() { + return this.nodeName; + } + + public String getOutputName() { + return this.outputName; + } + + public PType getPType() { + return outputPType; + } + + public List<RTNode> getChildren() { + return children; + } + + public DoFn<Object, Object> getDoFn() { + return fn; + } + + public Converter getInputConverter() { + return inputConverter; + } + + public Converter getOutputConverter() { + return outputConverter; + } + + public Emitter<Object> getEmitter() { + return emitter; + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/6543ded5/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java index a536b38..e811bcf 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java @@ -17,20 +17,14 @@ */ package org.apache.crunch.io; -import com.google.common.collect.Sets; import org.apache.crunch.CrunchRuntimeException; import org.apache.crunch.hadoop.mapreduce.TaskAttemptContextFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.JobStatus; -import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskInputOutputContext; -import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; import org.apache.hadoop.util.ReflectionUtils; import com.google.common.base.Joiner; @@ -41,7 +35,6 @@ import com.google.common.collect.Maps; import java.io.IOException; import java.util.List; import java.util.Map; -import java.util.Set; /** * An analogue of {@link CrunchInputs} for handling multiple {@code OutputFormat} instances @@ -71,32 +64,7 @@ public class CrunchOutputs<K, V> { conf.set(CRUNCH_OUTPUTS, existing == null ? inputs : existing + RECORD_SEP + inputs); } - public static void checkOutputSpecs(JobContext jc) throws IOException, InterruptedException { - Map<String, OutputConfig> outputs = getNamedOutputs(jc.getConfiguration()); - for (Map.Entry<String, OutputConfig> e : outputs.entrySet()) { - String namedOutput = e.getKey(); - Job job = getJob(e.getKey(), jc.getConfiguration()); - OutputFormat fmt = getOutputFormat(namedOutput, job, e.getValue()); - fmt.checkOutputSpecs(jc); - } - } - - public static OutputCommitter getOutputCommitter(TaskAttemptContext tac) throws IOException, InterruptedException { - Map<String, OutputConfig> outputs = getNamedOutputs(tac.getConfiguration()); - Map<String, OutputCommitter> committers = Maps.newHashMap(); - for (Map.Entry<String, OutputConfig> e : outputs.entrySet()) { - String namedOutput = e.getKey(); - Job job = getJob(e.getKey(), tac.getConfiguration()); - OutputFormat fmt = getOutputFormat(namedOutput, job, e.getValue()); - TaskAttemptContext taskContext = TaskAttemptContextFactory.create( - job.getConfiguration(), tac.getTaskAttemptID()); - OutputCommitter oc = fmt.getOutputCommitter(taskContext); - committers.put(namedOutput, oc); - } - return new CompositeOutputCommitter(outputs, committers); - } - - private static class OutputConfig<K, V> { + public static class OutputConfig<K, V> { public FormatBundle<OutputFormat<K, V>> bundle; public Class<K> keyClass; public Class<V> valueClass; @@ -108,14 +76,15 @@ public class CrunchOutputs<K, V> { this.valueClass = valueClass; } } - - private static Map<String, OutputConfig> getNamedOutputs(Configuration conf) { + + private static Map<String, OutputConfig> getNamedOutputs( + TaskInputOutputContext<?, ?, ?, ?> context) { + return getNamedOutputs(context.getConfiguration()); + } + + public static Map<String, OutputConfig> getNamedOutputs(Configuration conf) { Map<String, OutputConfig> out = Maps.newHashMap(); - String serOut = conf.get(CRUNCH_OUTPUTS); - if (serOut == null || serOut.isEmpty()) { - return out; - } - for (String input : Splitter.on(RECORD_SEP).split(serOut)) { + for (String input : Splitter.on(RECORD_SEP).split(conf.get(CRUNCH_OUTPUTS))) { List<String> fields = Lists.newArrayList(SPLITTER.split(input)); String name = fields.get(0); FormatBundle<OutputFormat> bundle = FormatBundle.fromSerialized(fields.get(1), conf); @@ -129,14 +98,13 @@ public class CrunchOutputs<K, V> { } return out; } - private static final String BASE_OUTPUT_NAME = "mapreduce.output.basename"; private static final String COUNTERS_GROUP = CrunchOutputs.class.getName(); - private TaskInputOutputContext<?, ?, K, V> baseContext; - private Configuration baseConf; + private final TaskInputOutputContext<?, ?, K, V> baseContext; private final Map<String, OutputConfig> namedOutputs; - private final Map<String, OutputState<K, V>> outputStates; + private final Map<String, RecordWriter<K, V>> recordWriters; + private final Map<String, TaskAttemptContext> taskContextCache; private final boolean disableOutputCounters; /** @@ -146,15 +114,11 @@ public class CrunchOutputs<K, V> { * @param context the TaskInputOutputContext object */ public CrunchOutputs(TaskInputOutputContext<?, ?, K, V> context) { - this(context.getConfiguration()); this.baseContext = context; - } - - public CrunchOutputs(Configuration conf) { - this.baseConf = conf; - this.namedOutputs = getNamedOutputs(conf); - this.outputStates = Maps.newHashMap(); - this.disableOutputCounters = conf.getBoolean(CRUNCH_DISABLE_OUTPUT_COUNTERS, false); + namedOutputs = getNamedOutputs(context); + recordWriters = Maps.newHashMap(); + taskContextCache = Maps.newHashMap(); + this.disableOutputCounters = context.getConfiguration().getBoolean(CRUNCH_DISABLE_OUTPUT_COUNTERS, false); } @SuppressWarnings("unchecked") @@ -164,174 +128,63 @@ public class CrunchOutputs<K, V> { throw new IllegalArgumentException("Undefined named output '" + namedOutput + "'"); } + TaskAttemptContext taskContext = getContext(namedOutput); if (!disableOutputCounters) { baseContext.getCounter(COUNTERS_GROUP, namedOutput).increment(1); } - getOutputState(namedOutput).write(key, value); + getRecordWriter(taskContext, namedOutput).write(key, value); } public void close() throws IOException, InterruptedException { - for (OutputState<?, ?> out : outputStates.values()) { - out.close(); + for (RecordWriter<?, ?> writer : recordWriters.values()) { + writer.close(baseContext); } } - private OutputState<K, V> getOutputState(String namedOutput) throws IOException, InterruptedException { - OutputState<?, ?> out = outputStates.get(namedOutput); - if (out != null) { - return (OutputState<K, V>) out; + private TaskAttemptContext getContext(String nameOutput) throws IOException { + TaskAttemptContext taskContext = taskContextCache.get(nameOutput); + if (taskContext != null) { + return taskContext; } // The following trick leverages the instantiation of a record writer via // the job thus supporting arbitrary output formats. - Job job = getJob(namedOutput, baseConf); - OutputFormat<K, V> fmt = getOutputFormat(namedOutput, job, namedOutputs.get(namedOutput)); - TaskAttemptContext taskContext = null; - RecordWriter<K, V> recordWriter = null; - if (baseContext != null) { - taskContext = TaskAttemptContextFactory.create( - job.getConfiguration(), baseContext.getTaskAttemptID()); - recordWriter = fmt.getRecordWriter(taskContext); - } - OutputState<K, V> outputState = new OutputState(taskContext, recordWriter); - this.outputStates.put(namedOutput, outputState); - return outputState; - } - - private static Job getJob(String namedOutput, Configuration baseConf) throws IOException { - Job job = new Job(new Configuration(baseConf)); - job.getConfiguration().set("crunch.namedoutput", namedOutput); - return job; - } - - private static void configureJob( - String namedOutput, - Job job, - OutputConfig outConfig) throws IOException { - job.getConfiguration().set(BASE_OUTPUT_NAME, namedOutput); - job.setOutputFormatClass(outConfig.bundle.getFormatClass()); - job.setOutputKeyClass(outConfig.keyClass); - job.setOutputValueClass(outConfig.valueClass); - outConfig.bundle.configure(job.getConfiguration()); - } - - private static OutputFormat getOutputFormat( - String namedOutput, - Job job, - OutputConfig outConfig) throws IOException { - configureJob(namedOutput, job, outConfig); - try { - return ReflectionUtils.newInstance( - job.getOutputFormatClass(), - job.getConfiguration()); - } catch (ClassNotFoundException e) { - throw new IOException(e); - } + OutputConfig outConfig = namedOutputs.get(nameOutput); + Configuration conf = new Configuration(baseContext.getConfiguration()); + Job job = new Job(conf); + job.getConfiguration().set("crunch.namedoutput", nameOutput); + job.setOutputFormatClass(outConfig.bundle.getFormatClass()); + job.setOutputKeyClass(outConfig.keyClass); + job.setOutputValueClass(outConfig.valueClass); + outConfig.bundle.configure(job.getConfiguration()); + taskContext = TaskAttemptContextFactory.create( + job.getConfiguration(), baseContext.getTaskAttemptID()); + + taskContextCache.put(nameOutput, taskContext); + return taskContext; } - private static class OutputState<K, V> { - private final TaskAttemptContext context; - private final RecordWriter<K, V> recordWriter; - - public OutputState(TaskAttemptContext context, RecordWriter<K, V> recordWriter) { - this.context = context; - this.recordWriter = recordWriter; - } - - public void write(K key, V value) throws IOException, InterruptedException { - recordWriter.write(key, value); - } - - public void close() throws IOException, InterruptedException { - recordWriter.close(context); - } - } - - private static class CompositeOutputCommitter extends OutputCommitter { - - private final Map<String, OutputConfig> outputs; - private final Map<String, OutputCommitter> committers; - - public CompositeOutputCommitter(Map<String, OutputConfig> outputs, Map<String, OutputCommitter> committers) { - this.outputs = outputs; - this.committers = committers; - } - - private TaskAttemptContext getContext(String namedOutput, TaskAttemptContext baseContext) throws IOException { - Job job = getJob(namedOutput, baseContext.getConfiguration()); - configureJob(namedOutput, job, outputs.get(namedOutput)); - return TaskAttemptContextFactory.create(job.getConfiguration(), baseContext.getTaskAttemptID()); - } - - @Override - public void setupJob(JobContext jobContext) throws IOException { - Configuration conf = jobContext.getConfiguration(); - for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) { - Job job = getJob(e.getKey(), conf); - configureJob(e.getKey(), job, outputs.get(e.getKey())); - e.getValue().setupJob(job); - } - } - - @Override - public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException { - for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) { - e.getValue().setupTask(getContext(e.getKey(), taskAttemptContext)); - } - } - - @Override - public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException { - for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) { - if (e.getValue().needsTaskCommit(getContext(e.getKey(), taskAttemptContext))) { - return true; - } - } - return false; - } - - @Override - public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException { - for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) { - e.getValue().commitTask(getContext(e.getKey(), taskAttemptContext)); - } - } - - @Override - public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException { - for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) { - e.getValue().abortTask(getContext(e.getKey(), taskAttemptContext)); - } - } + private synchronized RecordWriter<K, V> getRecordWriter( + TaskAttemptContext taskContext, String namedOutput) + throws IOException, InterruptedException { + // look for record-writer in the cache + RecordWriter<K, V> writer = recordWriters.get(namedOutput); - @Override - public void commitJob(JobContext jobContext) throws IOException { - Configuration conf = jobContext.getConfiguration(); - Set<Path> handledPaths = Sets.newHashSet(); - for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) { - OutputCommitter oc = e.getValue(); - if (oc instanceof FileOutputCommitter) { - Path workPath = ((FileOutputCommitter) oc).getWorkPath(); - if (handledPaths.contains(workPath)) { - continue; - } else { - handledPaths.add(workPath); - } - } - Job job = getJob(e.getKey(), conf); - configureJob(e.getKey(), job, outputs.get(e.getKey())); - oc.commitJob(job); + // If not in cache, create a new one + if (writer == null) { + // get the record writer from context output format + taskContext.getConfiguration().set(BASE_OUTPUT_NAME, namedOutput); + try { + OutputFormat format = ReflectionUtils.newInstance( + taskContext.getOutputFormatClass(), + taskContext.getConfiguration()); + writer = format.getRecordWriter(taskContext); + } catch (ClassNotFoundException e) { + throw new IOException(e); } + recordWriters.put(namedOutput, writer); } - @Override - public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException { - Configuration conf = jobContext.getConfiguration(); - for (Map.Entry<String, OutputCommitter> e : committers.entrySet()) { - Job job = getJob(e.getKey(), conf); - configureJob(e.getKey(), job, outputs.get(e.getKey())); - e.getValue().abortJob(job, state); - } - } + return writer; } } http://git-wip-us.apache.org/repos/asf/crunch/blob/6543ded5/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java ---------------------------------------------------------------------- diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java index 5d0f953..3b5b419 100644 --- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java +++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkRuntime.java @@ -18,12 +18,14 @@ package org.apache.crunch.impl.spark; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.common.util.concurrent.AbstractFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; + import org.apache.crunch.CombineFn; import org.apache.crunch.PCollection; import org.apache.crunch.PipelineCallable; @@ -206,6 +208,11 @@ public class SparkRuntime extends AbstractFuture<PipelineResult> implements Pipe } @Override + public Map<String, String> getNamedDotFiles() { + return ImmutableMap.of("", ""); + } + + @Override public void waitFor(long timeout, TimeUnit timeUnit) throws InterruptedException { doneSignal.await(timeout, timeUnit); } http://git-wip-us.apache.org/repos/asf/crunch/blob/6543ded5/src/main/config/checkstyle.xml ---------------------------------------------------------------------- diff --git a/src/main/config/checkstyle.xml b/src/main/config/checkstyle.xml index 4c74367..7cb721b 100644 --- a/src/main/config/checkstyle.xml +++ b/src/main/config/checkstyle.xml @@ -74,7 +74,6 @@ under the License. <module name="NeedBraces"/> <module name="RightCurly"/> <module name="AvoidInlineConditionals"/> - <module name="DoubleCheckedLocking"/> <module name="EmptyStatement"/> <module name="EqualsHashCode"/> <module name="HiddenField">
