Repository: crunch Updated Branches: refs/heads/master 7d7af4ef4 -> e59cb17a4
CRUNCH-587: Add missing filter(), filterByKey() and filterByValue() functions from Lambda LTable implementation Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/e59cb17a Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/e59cb17a Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/e59cb17a Branch: refs/heads/master Commit: e59cb17a4ddaee203c97f574be29cfc98ade246a Parents: 7d7af4e Author: David Whiting <[email protected]> Authored: Mon Jan 18 15:07:13 2016 +0100 Committer: David Whiting <[email protected]> Committed: Mon Jan 18 15:13:01 2016 +0100 ---------------------------------------------------------------------- .../java/org/apache/crunch/lambda/LTable.java | 21 ++++++++++++++++++++ 1 file changed, 21 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/e59cb17a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LTable.java ---------------------------------------------------------------------- diff --git a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LTable.java b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LTable.java index 0b4e4fa..9f6616e 100644 --- a/crunch-lambda/src/main/java/org/apache/crunch/lambda/LTable.java +++ b/crunch-lambda/src/main/java/org/apache/crunch/lambda/LTable.java @@ -97,6 +97,27 @@ public interface LTable<K, V> extends LCollection<Pair<K, V>> { } /** + * Filter the rows of the table using the supplied predicate. + */ + default LTable<K, V> filter(SPredicate<Pair<K, V>> predicate) { + return parallelDo(ctx -> { if (predicate.test(ctx.element())) ctx.emit(ctx.element());}, pType()); + } + + /** + * Filter the rows of the table using the supplied predicate applied to the key part of each record. + */ + default LTable<K, V> filterByKey(SPredicate<K> predicate) { + return parallelDo(ctx -> { if (predicate.test(ctx.element().first())) ctx.emit(ctx.element());}, pType()); + } + + /** + * Filter the rows of the table using the supplied predicate applied to the value part of each record. + */ + default LTable<K, V> filterByValue(SPredicate<V> predicate) { + return parallelDo(ctx -> { if (predicate.test(ctx.element().second())) ctx.emit(ctx.element());}, pType()); + } + + /** * Join this table to another {@link LTable} which has the same key type using the provided {@link JoinType} and * {@link JoinStrategy} */
