Removes FlatMapElements.MissingOutputTypeDescriptor This comes from changing FlatMapElements.via(fn).withOutputType(td) to FlatMapElements.into(td).via(fn) which is also shorter.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/831b11fb Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/831b11fb Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/831b11fb Branch: refs/heads/DSL_SQL Commit: 831b11fb712049ce78965ec2e677f9cf9ea66fc4 Parents: 3bf6c6b Author: Eugene Kirpichov <[email protected]> Authored: Wed Mar 29 13:56:00 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Wed Apr 12 11:35:05 2017 -0700 ---------------------------------------------------------------------- .../beam/examples/MinimalWordCountJava8.java | 5 +- .../examples/MinimalWordCountJava8Test.java | 5 +- .../beam/sdk/transforms/FlatMapElements.java | 113 +++++++++---------- .../transforms/FlatMapElementsJava8Test.java | 10 +- 4 files changed, 63 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/831b11fb/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java index 0072886..f424a7b 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java @@ -56,8 +56,9 @@ public class MinimalWordCountJava8 { Pipeline p = Pipeline.create(options); p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/*")) - .apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+"))) - .withOutputType(TypeDescriptors.strings())) + .apply(FlatMapElements + .into(TypeDescriptors.strings()) + .via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+")))) .apply(Filter.by((String word) -> !word.isEmpty())) .apply(Count.<String>perElement()) .apply(MapElements http://git-wip-us.apache.org/repos/asf/beam/blob/831b11fb/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java index 911ccf6..6c66d8f 100644 --- a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java +++ b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java @@ -63,8 +63,9 @@ public class MinimalWordCountJava8Test implements Serializable { p.getOptions().as(GcsOptions.class).setGcsUtil(buildMockGcsUtil()); p.apply(TextIO.Read.from("gs://apache-beam-samples/shakespeare/*")) - .apply(FlatMapElements.via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+"))) - .withOutputType(TypeDescriptors.strings())) + .apply(FlatMapElements + .into(TypeDescriptors.strings()) + .via((String word) -> Arrays.asList(word.split("[^a-zA-Z']+")))) .apply(Filter.by((String word) -> !word.isEmpty())) .apply(Count.<String>perElement()) .apply(MapElements http://git-wip-us.apache.org/repos/asf/beam/blob/831b11fb/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java index c165f7f..0983165 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java @@ -17,7 +17,10 @@ */ package org.apache.beam.sdk.transforms; +import static com.google.common.base.Preconditions.checkNotNull; + import java.lang.reflect.ParameterizedType; +import javax.annotation.Nullable; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TypeDescriptor; @@ -30,32 +33,29 @@ import org.apache.beam.sdk.values.TypeDescriptors; public class FlatMapElements<InputT, OutputT> extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> { /** - * For a {@code SerializableFunction<InputT, ? extends Iterable<OutputT>>} {@code fn}, - * returns a {@link PTransform} that applies {@code fn} to every element of the input - * {@code PCollection<InputT>} and outputs all of the elements to the output - * {@code PCollection<OutputT>}. - * - * <p>Example of use in Java 8: - * <pre>{@code - * PCollection<String> words = lines.apply( - * FlatMapElements.via((String line) -> Arrays.asList(line.split(" "))) - * .withOutputType(new TypeDescriptor<String>(){}); - * }</pre> - * - * <p>In Java 7, the overload {@link #via(SimpleFunction)} is more concise as the output type - * descriptor need not be provided. + * Temporarily stores the argument of {@link #into(TypeDescriptor)} until combined with the + * argument of {@link #via(SerializableFunction)} into the fully-specified {@link #fn}. Stays null + * if constructed using {@link #via(SimpleFunction)} directly. + */ + @Nullable + private final transient TypeDescriptor<Iterable<OutputT>> outputType; + + /** + * Non-null on a fully specified transform - is null only when constructed using {@link + * #into(TypeDescriptor)}, until the fn is specified using {@link #via(SerializableFunction)}. */ - public static <InputT, OutputT> MissingOutputTypeDescriptor<InputT, OutputT> - via(SerializableFunction<? super InputT, ? extends Iterable<OutputT>> fn) { + @Nullable + private final SimpleFunction<InputT, Iterable<OutputT>> fn; + private final DisplayData.ItemSpec<?> fnClassDisplayData; - // TypeDescriptor interacts poorly with the wildcards needed to correctly express - // covariance and contravariance in Java, so instead we cast it to an invariant - // function here. - @SuppressWarnings("unchecked") // safe covariant cast - SerializableFunction<InputT, Iterable<OutputT>> simplerFn = - (SerializableFunction<InputT, Iterable<OutputT>>) fn; + private FlatMapElements( + @Nullable SimpleFunction<InputT, Iterable<OutputT>> fn, + @Nullable TypeDescriptor<Iterable<OutputT>> outputType, + @Nullable Class<?> fnClass) { + this.fn = fn; + this.outputType = outputType; + this.fnClassDisplayData = DisplayData.item("flatMapFn", fnClass).withLabel("FlatMap Function"); - return new MissingOutputTypeDescriptor<>(simplerFn); } /** @@ -82,54 +82,45 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> { */ public static <InputT, OutputT> FlatMapElements<InputT, OutputT> via(SimpleFunction<? super InputT, ? extends Iterable<OutputT>> fn) { - // TypeDescriptor interacts poorly with the wildcards needed to correctly express - // covariance and contravariance in Java, so instead we cast it to an invariant - // function here. - @SuppressWarnings("unchecked") // safe covariant cast - SimpleFunction<InputT, Iterable<OutputT>> simplerFn = - (SimpleFunction<InputT, Iterable<OutputT>>) fn; - - return new FlatMapElements<>(simplerFn, fn.getClass()); + return new FlatMapElements(fn, null, fn.getClass()); } /** - * An intermediate builder for a {@link FlatMapElements} transform. To complete the transform, - * provide an output type descriptor to {@link MissingOutputTypeDescriptor#withOutputType}. See - * {@link #via(SerializableFunction)} for a full example of use. + * Returns a new {@link FlatMapElements} transform with the given type descriptor for the output + * type, but the mapping function yet to be specified using {@link #via(SerializableFunction)}. */ - public static final class MissingOutputTypeDescriptor<InputT, OutputT> { - - private final SerializableFunction<InputT, Iterable<OutputT>> fn; - - private MissingOutputTypeDescriptor( - SerializableFunction<InputT, Iterable<OutputT>> fn) { - this.fn = fn; - } - - public FlatMapElements<InputT, OutputT> withOutputType(TypeDescriptor<OutputT> outputType) { - TypeDescriptor<Iterable<OutputT>> iterableOutputType = TypeDescriptors.iterables(outputType); - - return new FlatMapElements<>( - SimpleFunction.fromSerializableFunctionWithOutputType(fn, - iterableOutputType), - fn.getClass()); - } + public static <OutputT> FlatMapElements<?, OutputT> + into(final TypeDescriptor<OutputT> outputType) { + return new FlatMapElements<>(null, TypeDescriptors.iterables(outputType), null); } - ////////////////////////////////////////////////////////////////////////////////////////////////// - - private final SimpleFunction<InputT, ? extends Iterable<OutputT>> fn; - private final DisplayData.ItemSpec<?> fnClassDisplayData; - - private FlatMapElements( - SimpleFunction<InputT, ? extends Iterable<OutputT>> fn, - Class<?> fnClass) { - this.fn = fn; - this.fnClassDisplayData = DisplayData.item("flatMapFn", fnClass).withLabel("FlatMap Function"); + /** + * For a {@code SerializableFunction<InputT, ? extends Iterable<OutputT>>} {@code fn}, + * returns a {@link PTransform} that applies {@code fn} to every element of the input + * {@code PCollection<InputT>} and outputs all of the elements to the output + * {@code PCollection<OutputT>}. + * + * <p>Example of use in Java 8: + * <pre>{@code + * PCollection<String> words = lines.apply( + * FlatMapElements.via((String line) -> Arrays.asList(line.split(" "))) + * .withOutputType(new TypeDescriptor<String>(){}); + * }</pre> + * + * <p>In Java 7, the overload {@link #via(SimpleFunction)} is more concise as the output type + * descriptor need not be provided. + */ + public <NewInputT> FlatMapElements<NewInputT, OutputT> + via(SerializableFunction<NewInputT, ? extends Iterable<OutputT>> fn) { + return new FlatMapElements( + SimpleFunction.fromSerializableFunctionWithOutputType(fn, (TypeDescriptor) outputType), + null, + fn.getClass()); } @Override public PCollection<OutputT> expand(PCollection<? extends InputT> input) { + checkNotNull(fn, "Must specify a function on FlatMapElements using .via()"); return input.apply( "FlatMap", ParDo.of( http://git-wip-us.apache.org/repos/asf/beam/blob/831b11fb/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsJava8Test.java ---------------------------------------------------------------------- diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsJava8Test.java index 471724d..501b0d1 100644 --- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsJava8Test.java +++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/FlatMapElementsJava8Test.java @@ -23,7 +23,7 @@ import java.util.List; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeDescriptors; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -52,8 +52,8 @@ public class FlatMapElementsJava8Test implements Serializable { .apply(Create.of(1, 2, 3)) .apply(FlatMapElements // Note that the input type annotation is required. - .via((Integer i) -> ImmutableList.of(i, -i)) - .withOutputType(new TypeDescriptor<Integer>() {})); + .into(TypeDescriptors.integers()) + .via((Integer i) -> ImmutableList.of(i, -i))); PAssert.that(output).containsInAnyOrder(1, 3, -1, -3, 2, -2); pipeline.run(); @@ -69,8 +69,8 @@ public class FlatMapElementsJava8Test implements Serializable { .apply(Create.of(1, 2, 3)) .apply(FlatMapElements // Note that the input type annotation is required. - .via(new Negater()::numAndNegation) - .withOutputType(new TypeDescriptor<Integer>() {})); + .into(TypeDescriptors.integers()) + .via(new Negater()::numAndNegation)); PAssert.that(output).containsInAnyOrder(1, 3, -1, -3, 2, -2); pipeline.run();
