Repository: crunch Updated Branches: refs/heads/master 1252e7f91 -> 65f39198e
CRUNCH-599: Fix increment and incrementIf methods in crunch-lambda so they also emit the incoming element Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/65f39198 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/65f39198 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/65f39198 Branch: refs/heads/master Commit: 65f39198ebc9ba5f1557afd4e350227919c80229 Parents: 1252e7f Author: David Whiting <[email protected]> Authored: Thu Mar 31 12:06:45 2016 +0200 Committer: David Whiting <[email protected]> Committed: Thu Mar 31 12:06:45 2016 +0200 ---------------------------------------------------------------------- .../main/java/org/apache/crunch/lambda/LCollection.java | 12 ++++++++++-- .../src/main/java/org/apache/crunch/lambda/LTable.java | 12 ++++++++++-- 2 files changed, 20 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/65f39198/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollection.java ---------------------------------------------------------------------- diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollection.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollection.java index 6a8dd62..a7ca310 100644 --- a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollection.java +++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LCollection.java @@ -141,14 +141,20 @@ public interface LCollection<S> { * Increment a counter for every element in the collection */ default LCollection<S> increment(Enum<?> counter) { - return parallelDo(ctx -> ctx.increment(counter), pType()); + return parallelDo(ctx -> { + ctx.increment(counter); + ctx.emit(ctx.element()); + }, pType()); } /** * Increment a counter for every element in the collection */ default LCollection<S> increment(String counterGroup, String counterName) { - return parallelDo(ctx -> ctx.increment(counterGroup, counterName), pType()); + return parallelDo(ctx -> { + ctx.increment(counterGroup, counterName); + ctx.emit(ctx.element()); + }, pType()); } /** @@ -157,6 +163,7 @@ public interface LCollection<S> { default LCollection<S> incrementIf(Enum<?> counter, SPredicate<S> condition) { return parallelDo(ctx -> { if (condition.test(ctx.element())) ctx.increment(counter); + ctx.emit(ctx.element()); }, pType()); } @@ -166,6 +173,7 @@ public interface LCollection<S> { default LCollection<S> incrementIf(String counterGroup, String counterName, SPredicate<S> condition) { return parallelDo(ctx -> { if (condition.test(ctx.element())) ctx.increment(counterGroup, counterName); + ctx.emit(ctx.element()); }, pType()); } http://git-wip-us.apache.org/repos/asf/crunch/blob/65f39198/crunch-lambda/src/main/java/org/apache/crunch/lambda/LTable.java ---------------------------------------------------------------------- diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LTable.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LTable.java index 9f6616e..8360a33 100644 --- a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LTable.java +++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LTable.java @@ -185,18 +185,25 @@ public interface LTable<K, V> extends LCollection<Pair<K, V>> { /** {@inheritDoc} */ default LTable<K, V> increment(Enum<?> counter) { - return parallelDo(ctx -> ctx.increment(counter), pType()); + return parallelDo(ctx -> { + ctx.increment(counter); + ctx.emit(ctx.element()); + }, pType()); } /** {@inheritDoc} */ default LTable<K, V> increment(String counterGroup, String counterName) { - return parallelDo(ctx -> ctx.increment(counterGroup, counterName), pType()); + return parallelDo(ctx -> { + ctx.increment(counterGroup, counterName); + ctx.emit(ctx.element()); + }, pType()); } /** {@inheritDoc} */ default LTable<K, V> incrementIf(Enum<?> counter, SPredicate<Pair<K, V>> condition) { return parallelDo(ctx -> { if (condition.test(ctx.element())) ctx.increment(counter); + ctx.emit(ctx.element()); }, pType()); } @@ -204,6 +211,7 @@ public interface LTable<K, V> extends LCollection<Pair<K, V>> { default LTable<K, V> incrementIf(String counterGroup, String counterName, SPredicate<Pair<K, V>> condition) { return parallelDo(ctx -> { if (condition.test(ctx.element())) ctx.increment(counterGroup, counterName); + ctx.emit(ctx.element()); }, pType()); } }
