Repository: crunch Updated Branches: refs/heads/apache-crunch-0.8 a96e30892 -> b6f203e7a
CRUNCH-381 Use descriptions in parallelDo calls Add simple descriptions to PCollection#parallelDo calls in library methods so that it's easier to trace through job plans. Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/b6f203e7 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/b6f203e7 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/b6f203e7 Branch: refs/heads/apache-crunch-0.8 Commit: b6f203e7a1313002722a0ae12a16882386af6e82 Parents: a96e308 Author: Gabriel Reid <[email protected]> Authored: Fri Apr 25 17:09:13 2014 +0200 Committer: Gabriel Reid <[email protected]> Committed: Fri Apr 25 22:15:28 2014 +0200 ---------------------------------------------------------------------- .../org/apache/crunch/impl/dist/collect/PCollectionImpl.java | 2 +- .../src/main/java/org/apache/crunch/lib/Cartesian.java | 2 +- .../src/main/java/org/apache/crunch/lib/Channels.java | 4 ++-- crunch-core/src/main/java/org/apache/crunch/lib/Sample.java | 8 ++++---- crunch-core/src/main/java/org/apache/crunch/lib/Set.java | 3 +++ 5 files changed, 11 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/b6f203e7/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java index 7f6984a..a1e70fe 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/PCollectionImpl.java @@ -228,7 +228,7 @@ public abstract class PCollectionImpl<S> implements PCollection<S> { @Override public PCollection<S> filter(FilterFn<S> filterFn) { - return parallelDo(filterFn, getPType()); + return parallelDo("Filter with " + filterFn.getClass().getSimpleName(), filterFn, getPType()); } @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/b6f203e7/crunch-core/src/main/java/org/apache/crunch/lib/Cartesian.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Cartesian.java b/crunch-core/src/main/java/org/apache/crunch/lib/Cartesian.java index 08327dd..6d47348 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/Cartesian.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/Cartesian.java @@ -205,7 +205,7 @@ public class Cartesian { PTypeFamily ctf = cg.getTypeFamily(); - return cg.parallelDo(new MapFn<Pair<Pair<Integer, Integer>, Pair<U, V>>, Pair<U, V>>() { + return cg.parallelDo("Extract second element", new MapFn<Pair<Pair<Integer, Integer>, Pair<U, V>>, Pair<U, V>>() { @Override public Pair<U, V> map(Pair<Pair<Integer, Integer>, Pair<U, V>> input) { return input.second(); http://git-wip-us.apache.org/repos/asf/crunch/blob/b6f203e7/crunch-core/src/main/java/org/apache/crunch/lib/Channels.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Channels.java b/crunch-core/src/main/java/org/apache/crunch/lib/Channels.java index 568ca20..2e0fe1d 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/Channels.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/Channels.java @@ -59,8 +59,8 @@ public class Channels { public static <T, U> Pair<PCollection<T>, PCollection<U>> split(PCollection<Pair<T, U>> pCollection, PType<T> firstPType, PType<U> secondPType) { - PCollection<T> first = pCollection.parallelDo(new FirstEmittingDoFn<T, U>(), firstPType); - PCollection<U> second = pCollection.parallelDo(new SecondEmittingDoFn<T, U>(), secondPType); + PCollection<T> first = pCollection.parallelDo("Extract first value", new FirstEmittingDoFn<T, U>(), firstPType); + PCollection<U> second = pCollection.parallelDo("Extract second value", new SecondEmittingDoFn<T, U>(), secondPType); return Pair.of(first, second); } http://git-wip-us.apache.org/repos/asf/crunch/blob/b6f203e7/crunch-core/src/main/java/org/apache/crunch/lib/Sample.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Sample.java b/crunch-core/src/main/java/org/apache/crunch/lib/Sample.java index 5266545..ec0006a 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/Sample.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/Sample.java @@ -119,7 +119,7 @@ public class Sample { PTypeFamily ptf = input.getTypeFamily(); PType<Pair<T, Integer>> ptype = ptf.pairs(input.getPType(), ptf.ints()); return weightedReservoirSample( - input.parallelDo(new MapFn<T, Pair<T, Integer>>() { + input.parallelDo("Map to pairs for reservoir sampling", new MapFn<T, Pair<T, Integer>>() { @Override public Pair<T, Integer> map(T t) { return Pair.of(t, 1); } }, ptype), @@ -163,7 +163,7 @@ public class Sample { }, ptf.tableOf(ptf.ints(), input.getPType())); int[] ss = { sampleSize }; return groupedWeightedReservoirSample(groupedIn, ss, seed) - .parallelDo(new MapFn<Pair<Integer, T>, T>() { + .parallelDo("Extract sampled value from pair", new MapFn<Pair<Integer, T>, T>() { @Override public T map(Pair<Integer, T> p) { return p.second(); @@ -204,10 +204,10 @@ public class Sample { PTableType<Integer, Pair<Double, T>> ptt = ptf.tableOf(ptf.ints(), ptf.pairs(ptf.doubles(), ttype)); - return input.parallelDo(new ReservoirSampleFn<T, N>(sampleSizes, seed, ttype), ptt) + return input.parallelDo("Initial reservoir sampling", new ReservoirSampleFn<T, N>(sampleSizes, seed, ttype), ptt) .groupByKey(1) .combineValues(new WRSCombineFn<T>(sampleSizes, ttype)) - .parallelDo(new MapFn<Pair<Integer, Pair<Double, T>>, Pair<Integer, T>>() { + .parallelDo("Extract sampled values", new MapFn<Pair<Integer, Pair<Double, T>>, Pair<Integer, T>>() { @Override public Pair<Integer, T> map(Pair<Integer, Pair<Double, T>> p) { return Pair.of(p.first(), p.second().second()); http://git-wip-us.apache.org/repos/asf/crunch/blob/b6f203e7/crunch-core/src/main/java/org/apache/crunch/lib/Set.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Set.java b/crunch-core/src/main/java/org/apache/crunch/lib/Set.java index 0ba879c..bb16659 100644 --- a/crunch-core/src/main/java/org/apache/crunch/lib/Set.java +++ b/crunch-core/src/main/java/org/apache/crunch/lib/Set.java @@ -42,6 +42,7 @@ public class Set { */ public static <T> PCollection<T> difference(PCollection<T> coll1, PCollection<T> coll2) { return Cogroup.cogroup(toTable(coll1), toTable(coll2)).parallelDo( + "Calculate differences of sets", new DoFn<Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>>, T>() { @Override public void process(Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>> input, Emitter<T> emitter) { @@ -61,6 +62,7 @@ public class Set { */ public static <T> PCollection<T> intersection(PCollection<T> coll1, PCollection<T> coll2) { return Cogroup.cogroup(toTable(coll1), toTable(coll2)).parallelDo( + "Calculate intersection of sets", new DoFn<Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>>, T>() { @Override public void process(Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>> input, Emitter<T> emitter) { @@ -91,6 +93,7 @@ public class Set { PTypeFamily typeFamily = coll1.getTypeFamily(); PType<T> type = coll1.getPType(); return Cogroup.cogroup(toTable(coll1), toTable(coll2)).parallelDo( + "Calculate common values of sets", new DoFn<Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>>, Tuple3<T, T, T>>() { @Override public void process(Pair<T, Pair<Collection<Boolean>, Collection<Boolean>>> input,
