Updated Branches: refs/heads/master 012e924e6 -> 457a067b6
CRUNCH-48: Add length limit options to the String concatenation combiner. Contributed by Gauthier Ambard. Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/457a067b Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/457a067b Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/457a067b Branch: refs/heads/master Commit: 457a067b688ab129dc3adc01f5aacc2fa7114dbc Parents: 012e924 Author: jwills <[email protected]> Authored: Mon Aug 20 00:38:42 2012 -0700 Committer: jwills <[email protected]> Committed: Mon Aug 20 00:38:42 2012 -0700 ---------------------------------------------------------------------- .../src/main/java/org/apache/crunch/CombineFn.java | 54 ++++++++++++++- .../test/java/org/apache/crunch/CombineFnTest.java | 7 ++- 2 files changed, 57 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/457a067b/crunch/src/main/java/org/apache/crunch/CombineFn.java ---------------------------------------------------------------------- diff --git a/crunch/src/main/java/org/apache/crunch/CombineFn.java b/crunch/src/main/java/org/apache/crunch/CombineFn.java index 27183a9..88fbbaf 100644 --- a/crunch/src/main/java/org/apache/crunch/CombineFn.java +++ b/crunch/src/main/java/org/apache/crunch/CombineFn.java @@ -335,9 +335,9 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, return aggregator(new LastNAggregator<V>(n)); } - /** - * Used to concatenate strings, with a separator between each strings. + * Used to concatenate strings, with a separator between each strings. There + * is no limits of length for the concatenated string. * * @param separator * the separator which will be appended between each string @@ -348,7 +348,35 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, * @return */ public static final <K> CombineFn<K, String> STRING_CONCAT(final String separator, final boolean skipNull) { - return aggregator(new StringConcatAggregator(separator, skipNull)); + return aggregator(new StringConcatAggregator(separator, skipNull)); + } + + /** + * Used to concatenate strings, with a separator between each strings. You + * can specify the maximum length of the output string and of the input + * strings, if they are > 0. If a value is <= 0, there is no limits. + * + * Any too large string (or any string which would made the output too + * large) will be silently discarded. + * + * @param separator + * the separator which will be appended between each string + * @param skipNull + * define if we should skip null values. Throw + * NullPointerException if set to false and there is a null + * value. + * @param maxOutputLength + * the maximum length of the output string. If it's set <= 0, + * there is no limits. The number of characters of the output + * string will be < maxOutputLength. + * @param maxInputLength + * the maximum length of the input strings. If it's set <= 0, + * there is no limits. The number of characters of the int string + * will be < maxInputLength to be concatenated. + * @return + */ + public static final <K> CombineFn<K, String> STRING_CONCAT(final String separator, final boolean skipNull, final long maxOutputLength, final long maxInputLength) { + return aggregator(new StringConcatAggregator(separator, skipNull, maxOutputLength, maxInputLength)); } public static class SumLongs implements Aggregator<Long> { @@ -870,6 +898,9 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, public static class StringConcatAggregator implements Aggregator<String> { private final String separator; private final boolean skipNulls; + private final long maxOutputLength; + private final long maxInputLength; + private long currentLength; private final LinkedList<String> list = new LinkedList<String>(); private transient Joiner joiner; @@ -877,6 +908,16 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, public StringConcatAggregator(final String separator, final boolean skipNulls) { this.separator = separator; this.skipNulls = skipNulls; + this.maxInputLength = 0; + this.maxOutputLength = 0; + } + + public StringConcatAggregator(final String separator, final boolean skipNull, final long maxOutputLength, final long maxInputLength) { + this.separator = separator; + this.skipNulls = skipNull; + this.maxOutputLength = maxOutputLength; + this.maxInputLength = maxInputLength; + this.currentLength = -separator.length(); } @Override @@ -889,6 +930,13 @@ public abstract class CombineFn<S, T> extends DoFn<Pair<S, Iterable<T>>, Pair<S, @Override public void update(final String next) { + long length = (next == null) ? 0 : next.length() + separator.length(); + if (maxOutputLength > 0 && currentLength + length > maxOutputLength || maxInputLength > 0 && next.length() > maxInputLength) { + return; + } + if (maxOutputLength > 0) { + currentLength += length; + } list.add(next); } http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/457a067b/crunch/src/test/java/org/apache/crunch/CombineFnTest.java ---------------------------------------------------------------------- diff --git a/crunch/src/test/java/org/apache/crunch/CombineFnTest.java b/crunch/src/test/java/org/apache/crunch/CombineFnTest.java index 82bdf00..af67ec3 100644 --- a/crunch/src/test/java/org/apache/crunch/CombineFnTest.java +++ b/crunch/src/test/java/org/apache/crunch/CombineFnTest.java @@ -191,13 +191,18 @@ public class CombineFnTest { String[] arrayNull = new String[] { null, "" }; assertEquals(ImmutableList.of("foofoobarbar"), applyAggregator( new StringConcatAggregator("", true), ImmutableList.of("foo", "foobar", "bar"))); - assertEquals(ImmutableList.of("foo/foobar/bar"), applyAggregator( new StringConcatAggregator("/", false), ImmutableList.of("foo", "foobar", "bar"))); assertEquals(ImmutableList.of(" "), applyAggregator( new StringConcatAggregator(" ", true), ImmutableList.of(" ", ""))); assertEquals(ImmutableList.of(""), applyAggregator( new StringConcatAggregator(" ", true), Arrays.asList(arrayNull))); + assertEquals(ImmutableList.of("foo bar"), applyAggregator( + new StringConcatAggregator(" ", true, 20, 3), ImmutableList.of("foo", "foobar", "bar"))); + assertEquals(ImmutableList.of("foo foobar"), applyAggregator( + new StringConcatAggregator(" ", true, 10, 6), ImmutableList.of("foo", "foobar", "bar"))); + assertEquals(ImmutableList.of("foo bar"), applyAggregator( + new StringConcatAggregator(" ", true, 9, 6), ImmutableList.of("foo", "foobar", "bar"))); } @Test(expected = NullPointerException.class)
