Repository: beam Updated Branches: refs/heads/master 83f8c460c -> 47304d1fc
Refactored existing code. Added iterable and KV. Changed from element to of. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e01ce864 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e01ce864 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e01ce864 Branch: refs/heads/master Commit: e01ce864edf551afefe861041541bb2a05340a08 Parents: 83f8c46 Author: Jesse Anderson <[email protected]> Authored: Tue Jan 24 08:37:33 2017 -0800 Committer: Dan Halperin <[email protected]> Committed: Thu Jan 26 22:52:09 2017 -0800 ---------------------------------------------------------------------- .../apache/beam/sdk/transforms/ToString.java | 168 ++++++++++++++++--- .../java/org/apache/beam/sdk/io/WriteTest.java | 2 +- .../beam/sdk/transforms/ToStringTest.java | 86 ++++++++-- 3 files changed, 226 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/e01ce864/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java index ef49267..d5c9784 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java @@ -18,51 +18,181 @@ package org.apache.beam.sdk.transforms; +import java.util.Iterator; + +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; /** - * {@link PTransform PTransforms} for converting a {@link PCollection PCollection<T>} to a - * {@link PCollection PCollection<String>}. - * - * <p>Example of use: - * <pre> {@code - * PCollection<Long> longs = ...; - * PCollection<String> strings = longs.apply(ToString.<Long>element()); - * } </pre> - * + * {@link PTransform PTransforms} for converting a {@link PCollection PCollection<?>}, + * {@link PCollection PCollection<KV<?,?>>}, or + * {@link PCollection PCollection<Iterable<?>>} + * to a {@link PCollection PCollection<String>}. * * <p><b>Note</b>: For any custom string conversion and formatting, we recommend applying your own * {@link SerializableFunction} using {@link MapElements#via(SerializableFunction)} */ public final class ToString { + private ToString() { + // do not instantiate + } /** * Returns a {@code PTransform<PCollection, PCollection<String>>} which transforms each * element of the input {@link PCollection} to a {@link String} using the * {@link Object#toString} method. */ - public static PTransform<PCollection<?>, PCollection<String>> element() { - return new Default(); + public static PTransform<PCollection<?>, PCollection<String>> of() { + return new SimpleToString(); } - private ToString() { + /** + * Returns a {@code PTransform<PCollection<KV<?,?>, PCollection<String>>} which transforms each + * element of the input {@link PCollection} to a {@link String} by using the + * {@link Object#toString} on the key followed by a "," followed by the {@link Object#toString} + * of the value. + */ + public static PTransform<PCollection<? extends KV<?, ?>>, PCollection<String>> kv() { + return kv(","); + } + + /** + * Returns a {@code PTransform<PCollection<KV<?,?>, PCollection<String>>} which transforms each + * element of the input {@link PCollection} to a {@link String} by using the + * {@link Object#toString} on the key followed by the specified delimeter followed by the + * {@link Object#toString} of the value. + * @param delimiter The delimiter to put between the key and value + */ + public static PTransform<PCollection<? extends KV<?, ?>>, + PCollection<String>> kv(String delimiter) { + return new KVToString(delimiter); + } + + /** + * Returns a {@code PTransform<PCollection<Iterable<?>, PCollection<String>>} which + * transforms each item in the iterable of the input {@link PCollection} to a {@link String} + * using the {@link Object#toString} method followed by a "," until + * the last element in the iterable. There is no trailing delimiter. + */ + public static PTransform<PCollection<? extends Iterable<?>>, PCollection<String>> iterable() { + return iterable(","); + } + + /** + * Returns a {@code PTransform<PCollection<Iterable<?>, PCollection<String>>} which + * transforms each item in the iterable of the input {@link PCollection} to a {@link String} + * using the {@link Object#toString} method followed by the specified delimiter until + * the last element in the iterable. There is no trailing delimiter. + * @param delimiter The delimiter to put between the items in the iterable. + */ + public static PTransform<PCollection<? extends Iterable<?>>, + PCollection<String>> iterable(String delimiter) { + return new IterablesToString(delimiter); } /** * A {@link PTransform} that converts a {@code PCollection} to a {@code PCollection<String>} * using the {@link Object#toString} method. + * + * <p>Example of use: + * <pre>{@code + * PCollection<Long> longs = ...; + * PCollection<String> strings = longs.apply(ToString.of()); + * }</pre> + * + * + * <p><b>Note</b>: For any custom string conversion and formatting, we recommend applying your own + * {@link SerializableFunction} using {@link MapElements#via(SerializableFunction)} */ - private static final class Default extends PTransform<PCollection<?>, PCollection<String>> { + private static final class SimpleToString extends + PTransform<PCollection<?>, PCollection<String>> { @Override public PCollection<String> expand(PCollection<?> input) { - return input.apply(MapElements.via(new ToStringFunction<>())); + return input.apply(MapElements.via(new SimpleFunction<Object, String>() { + @Override + public String apply(Object input) { + return input.toString(); + } + })); } + } + + /** + * A {@link PTransform} that converts a {@code PCollection} of {@code KV} to a + * {@code PCollection<String>} using the {@link Object#toString} method for + * the key and value and an optional delimiter. + * + * <p>Example of use: + * <pre>{@code + * PCollection<KV<String, Long>> nameToLong = ...; + * PCollection<String> strings = nameToLong.apply(ToString.kv()); + * }</pre> + * + * + * <p><b>Note</b>: For any custom string conversion and formatting, we recommend applying your + * own {@link SerializableFunction} using {@link MapElements#via(SerializableFunction)} + */ + private static final class KVToString extends + PTransform<PCollection<? extends KV<?, ?>>, PCollection<String>> { + private final String delimiter; + + public KVToString(String delimiter) { + this.delimiter = delimiter; + } + + @Override + public PCollection<String> expand(PCollection<? extends KV<?, ?>> input) { + return input.apply(MapElements.via(new SimpleFunction<KV<?, ?>, String>() { + @Override + public String apply(KV<?, ?> input) { + return input.getKey().toString() + delimiter + input.getValue().toString(); + } + })); + } + } + + /** + * A {@link PTransform} that converts a {@code PCollection} of {@link Iterable} to a + * {@code PCollection<String>} using the {@link Object#toString} method and + * an optional delimiter. + * + * <p>Example of use: + * <pre>{@code + * PCollection<Iterable<Long>> longs = ...; + * PCollection<String> strings = nameToLong.apply(ToString.iterable()); + * }</pre> + * + * + * <p><b>Note</b>: For any custom string conversion and formatting, we recommend applying your + * own {@link SerializableFunction} using {@link MapElements#via(SerializableFunction)} + */ + private static final class IterablesToString extends + PTransform<PCollection<? extends Iterable<?>>, PCollection<String>> { + private final String delimiter; + + public IterablesToString(String delimiter) { + this.delimiter = delimiter; + } + + @Override + public PCollection<String> expand(PCollection<? extends Iterable<?>> input) { + return input.apply(MapElements.via(new SimpleFunction<Iterable<?>, String>() { + @Override + public String apply(Iterable<?> input) { + StringBuilder builder = new StringBuilder(); + Iterator iterator = input.iterator(); + + while (iterator.hasNext()) { + builder.append(iterator.next().toString()); + + if (iterator.hasNext()) { + builder.append(delimiter); + } + } - private static class ToStringFunction<T> extends SimpleFunction<T, String> { - @Override - public String apply(T input) { - return input.toString(); - } + return builder.toString(); + } + })); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/e01ce864/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java index 9772b9b..f81cc0c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java @@ -297,7 +297,7 @@ public class WriteTest { @Test public void testWriteUnbounded() { PCollection<String> unbounded = p.apply(CountingInput.unbounded()) - .apply(ToString.element()); + .apply(ToString.of()); TestSink sink = new TestSink(); thrown.expect(IllegalArgumentException.class); http://git-wip-us.apache.org/repos/asf/beam/blob/e01ce864/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java index e5c9f05..ab984f1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java @@ -20,10 +20,13 @@ package org.apache.beam.sdk.transforms; import java.util.ArrayList; import java.util.Arrays; -import java.util.List; + +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.junit.Rule; import org.junit.Test; @@ -41,19 +44,82 @@ public class ToStringTest { @Test @Category(RunnableOnService.class) - public void testToStringElement() { + public void testToStringOf() { Integer[] ints = {1, 2, 3, 4, 5}; + String[] strings = {"1", "2", "3", "4", "5"}; PCollection<Integer> input = p.apply(Create.of(Arrays.asList(ints))); - PCollection<String> output = input.apply(ToString.<Integer>element()); - PAssert.that(output).containsInAnyOrder(toStringList(ints)); + PCollection<String> output = input.apply(ToString.of()); + PAssert.that(output).containsInAnyOrder(strings); + p.run(); + } + + @Test + @Category(RunnableOnService.class) + public void testToStringKV() { + ArrayList<KV<String, Integer>> kvs = new ArrayList<>(); + kvs.add(KV.of("one", 1)); + kvs.add(KV.of("two", 2)); + + ArrayList<String> expected = new ArrayList<>(); + expected.add("one,1"); + expected.add("two,2"); + + PCollection<KV<String, Integer>> input = p.apply(Create.of(kvs)); + PCollection<String> output = input.apply(ToString.kv()); + PAssert.that(output).containsInAnyOrder(expected); p.run(); } - private List<String> toStringList(Object[] ints) { - List<String> ll = new ArrayList<>(ints.length); - for (Object i : ints) { - ll.add(i.toString()); - } - return ll; + @Test + @Category(RunnableOnService.class) + public void testToStringKVWithDelimiter() { + ArrayList<KV<String, Integer>> kvs = new ArrayList<>(); + kvs.add(KV.of("one", 1)); + kvs.add(KV.of("two", 2)); + + ArrayList<String> expected = new ArrayList<>(); + expected.add("one\t1"); + expected.add("two\t2"); + + PCollection<KV<String, Integer>> input = p.apply(Create.of(kvs)); + PCollection<String> output = input.apply(ToString.kv("\t")); + PAssert.that(output).containsInAnyOrder(expected); + p.run(); + } + + @Test + @Category(RunnableOnService.class) + public void testToStringIterable() { + ArrayList<Iterable<String>> iterables = new ArrayList<>(); + iterables.add(Arrays.asList(new String[]{"one", "two", "three"})); + iterables.add(Arrays.asList(new String[]{"four", "five", "six"})); + + ArrayList<String> expected = new ArrayList<>(); + expected.add("one,two,three"); + expected.add("four,five,six"); + + PCollection<Iterable<String>> input = p.apply(Create.of(iterables) + .withCoder(IterableCoder.of(StringUtf8Coder.of()))); + PCollection<String> output = input.apply(ToString.iterable()); + PAssert.that(output).containsInAnyOrder(expected); + p.run(); + } + + @Test + @Category(RunnableOnService.class) + public void testToStringIterableWithDelimiter() { + ArrayList<Iterable<String>> iterables = new ArrayList<>(); + iterables.add(Arrays.asList(new String[]{"one", "two", "three"})); + iterables.add(Arrays.asList(new String[]{"four", "five", "six"})); + + ArrayList<String> expected = new ArrayList<>(); + expected.add("one\ttwo\tthree"); + expected.add("four\tfive\tsix"); + + PCollection<Iterable<String>> input = p.apply(Create.of(iterables) + .withCoder(IterableCoder.of(StringUtf8Coder.of()))); + PCollection<String> output = input.apply(ToString.iterable("\t")); + PAssert.that(output).containsInAnyOrder(expected); + p.run(); } }
