Repository: crunch Updated Branches: refs/heads/master 0b19717d1 -> 3fff74e2e
CRUNCH-519: Add more detail to plan dot file. Contributed by Ron Hashimshony. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/3fff74e2 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/3fff74e2 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/3fff74e2 Branch: refs/heads/master Commit: 3fff74e2e18b22d5cdc302f52e19ef10028a0c31 Parents: 0b19717 Author: Josh Wills <[email protected]> Authored: Tue Aug 23 20:07:23 2016 -0700 Committer: Josh Wills <[email protected]> Committed: Tue Aug 23 20:07:23 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/crunch/Breakpoint2IT.java | 2 +- .../impl/mr/collect/PGroupedTableImpl.java | 14 ++++++++ .../crunch/impl/mr/plan/DotfileWriter.java | 22 +++++++++++- .../crunch/impl/mr/plan/DotfileWriterTest.java | 36 ++++++++++++++++++-- 4 files changed, 70 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/3fff74e2/crunch-core/src/it/java/org/apache/crunch/Breakpoint2IT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/Breakpoint2IT.java b/crunch-core/src/it/java/org/apache/crunch/Breakpoint2IT.java index 4b76c8b..4cb1af4 100644 --- a/crunch-core/src/it/java/org/apache/crunch/Breakpoint2IT.java +++ b/crunch-core/src/it/java/org/apache/crunch/Breakpoint2IT.java @@ -96,7 +96,7 @@ public class Breakpoint2IT { MRPipelineExecution exec = pipeline.runAsync(); int fnCount = 0; for (String line : exec.getPlanDotFile().split("\n")) { - if (line.contains("label=\"Transform pCol1 to PTable\"")) { + if (line.contains("label=\"Transform pCol1 to PTable 0 Mb\"")) { fnCount++; } } http://git-wip-us.apache.org/repos/asf/crunch/blob/3fff74e2/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java index 0c8de36..278b74a 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java @@ -61,4 +61,18 @@ public class PGroupedTableImpl<K, V> extends BaseGroupedTable<K, V> implements M public DoNode getGroupingNode() { return DoNode.createGroupingNode("", ptype); } + + public int getNumReduceTasks() { + int numReduceTasks; + if (groupingOptions == null || groupingOptions.getNumReducers() <= 0) { + numReduceTasks = PartitionUtils.getRecommendedPartitions(this, getPipeline().getConfiguration()); + } else { + numReduceTasks = groupingOptions.getNumReducers(); + } + return numReduceTasks; + } + + public boolean isNumReduceTasksSetByUser() { + return (groupingOptions != null && groupingOptions.getNumReducers() > 0); + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/3fff74e2/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java index de96852..24afb7c 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DotfileWriter.java @@ -17,6 +17,7 @@ */ package org.apache.crunch.impl.mr.plan; +import java.text.DecimalFormat; import java.util.List; import java.util.Map; import java.util.Set; @@ -64,9 +65,28 @@ public class DotfileWriter { if (pcollectionImpl instanceof InputCollection) { shape = "folder"; } - return String.format("%s [label=\"%s\" shape=%s];", + + String size = ""; + try { + DecimalFormat formatter = new DecimalFormat("#,###.##"); + size = " " + formatter.format(pcollectionImpl.getSize()/1024.0/1024.0) + " Mb"; + } catch (Exception e) { + // Just skip those that don't have a size + } + + if (pcollectionImpl instanceof PGroupedTableImpl) { + int numReduceTasks = ((PGroupedTableImpl) pcollectionImpl).getNumReduceTasks(); + if (numReduceTasks > 0) { + PGroupedTableImpl pGroupedTable = (PGroupedTableImpl) pcollectionImpl; + String setByUser = pGroupedTable.isNumReduceTasksSetByUser() ? "Manual" : "Automatic"; + size += " (" + pGroupedTable.getNumReduceTasks() + " " + setByUser + " reducers)"; + } + } + + return String.format("%s [label=\"%s%s\" shape=%s];", formatPCollection(pcollectionImpl, jobPrototype), limitNodeNameLength(pcollectionImpl.getName()), + size, shape); } http://git-wip-us.apache.org/repos/asf/crunch/blob/3fff74e2/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/DotfileWriterTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/DotfileWriterTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/DotfileWriterTest.java index 239da53..d7ba828 100644 --- a/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/DotfileWriterTest.java +++ b/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/DotfileWriterTest.java @@ -34,6 +34,7 @@ import org.apache.crunch.SourceTarget; import org.apache.crunch.Target; import org.apache.crunch.impl.dist.collect.PCollectionImpl; import org.apache.crunch.impl.mr.collect.InputCollection; +import org.apache.crunch.impl.mr.collect.PGroupedTableImpl; import org.apache.crunch.impl.mr.plan.DotfileWriter.MRTaskType; import org.junit.Before; import org.junit.Test; @@ -53,9 +54,10 @@ public class DotfileWriterTest { PCollectionImpl<?> pcollectionImpl = mock(PCollectionImpl.class); JobPrototype jobPrototype = mock(JobPrototype.class); when(pcollectionImpl.getName()).thenReturn("collection"); + when(pcollectionImpl.getSize()).thenReturn(1024L * 500L); assertEquals("\"collection@" + pcollectionImpl.hashCode() + "@" + jobPrototype.hashCode() - + "\" [label=\"collection\" shape=box];", + + "\" [label=\"collection 0.49 Mb\" shape=box];", dotfileWriter.formatPCollectionNodeDeclaration(pcollectionImpl, jobPrototype)); } @@ -65,12 +67,42 @@ public class DotfileWriterTest { JobPrototype jobPrototype = mock(JobPrototype.class); when(inputCollection.getName()).thenReturn("input"); when(inputCollection.getSource().toString()).thenReturn("source"); + when(inputCollection.getSize()).thenReturn(1024L * 1024L * 1729L); - assertEquals("\"source\" [label=\"input\" shape=folder];", + assertEquals("\"source\" [label=\"input 1,729 Mb\" shape=folder];", dotfileWriter.formatPCollectionNodeDeclaration(inputCollection, jobPrototype)); } @Test + public void testFormatPGroupedTableImplDeclarationAutomatic() { + PGroupedTableImpl<?,?> inputCollection = mock(PGroupedTableImpl.class, Mockito.RETURNS_DEEP_STUBS); + JobPrototype jobPrototype = mock(JobPrototype.class); + when(inputCollection.getName()).thenReturn("GBK"); + when(inputCollection.getSize()).thenReturn(1024L * 1024L * 1729L); + when(inputCollection.getNumReduceTasks()).thenReturn(10); + + String expected = "\"GBK@" + inputCollection.hashCode() + "@" + jobPrototype.hashCode() + "\" [label=\"GBK " + + "1,729 Mb (10 Automatic reducers)\" shape=box];"; + + assertEquals(expected, dotfileWriter.formatPCollectionNodeDeclaration(inputCollection, jobPrototype)); + } + + @Test + public void testFormatPGroupedTableImplDeclarationManual() { + PGroupedTableImpl<?,?> inputCollection = mock(PGroupedTableImpl.class, Mockito.RETURNS_DEEP_STUBS); + JobPrototype jobPrototype = mock(JobPrototype.class); + when(inputCollection.getName()).thenReturn("collection"); + when(inputCollection.getSize()).thenReturn(1024L * 1024L * 1729L); + when(inputCollection.getNumReduceTasks()).thenReturn(50); + when(inputCollection.isNumReduceTasksSetByUser()).thenReturn(true); + + String expected = "\"collection@" + inputCollection.hashCode() + "@" + jobPrototype.hashCode() + "\" [label=\"collection " + + "1,729 Mb (50 Manual reducers)\" shape=box];"; + + assertEquals(expected, dotfileWriter.formatPCollectionNodeDeclaration(inputCollection, jobPrototype)); + } + + @Test public void testFormatTargetNodeDeclaration() { Target target = mock(Target.class); when(target.toString()).thenReturn("target/path");
