Removes MapElements.MissingOutputTypeDescriptor This comes from changing MapElements.via(fn).withOutputType(td) to MapElements.into(td).via(fn) which is also shorter.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3bf6c6b3 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3bf6c6b3 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3bf6c6b3 Branch: refs/heads/DSL_SQL Commit: 3bf6c6b3f81c643a1107346674088f554aca29a8 Parents: f314354 Author: Eugene Kirpichov <[email protected]> Authored: Wed Mar 29 13:42:29 2017 -0700 Committer: Eugene Kirpichov <[email protected]> Committed: Wed Apr 12 11:35:05 2017 -0700 ---------------------------------------------------------------------- .../beam/examples/MinimalWordCountJava8.java | 4 +- .../beam/examples/complete/game/GameStats.java | 6 +- .../beam/examples/complete/game/UserScore.java | 5 +- .../examples/MinimalWordCountJava8Test.java | 4 +- .../complete/game/HourlyTeamScoreTest.java | 5 +- .../examples/complete/game/UserScoreTest.java | 6 +- .../beam/runners/direct/DirectRunnerTest.java | 5 +- .../apache/beam/sdk/transforms/MapElements.java | 99 ++++++++++---------- .../beam/sdk/transforms/MapElementsTest.java | 25 +++-- .../sdk/io/gcp/bigquery/BatchLoadBigQuery.java | 7 +- .../sdk/transforms/MapElementsJava8Test.java | 10 +- 11 files changed, 90 insertions(+), 86 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/3bf6c6b3/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java index 738b64d..0072886 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java @@ -61,8 +61,8 @@ public class MinimalWordCountJava8 { .apply(Filter.by((String word) -> !word.isEmpty())) .apply(Count.<String>perElement()) .apply(MapElements - .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()) - .withOutputType(TypeDescriptors.strings())) + .into(TypeDescriptors.strings()) + .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue())) // CHANGE 3/3: The Google Cloud Storage path is required for outputting the results to. .apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX")); http://git-wip-us.apache.org/repos/asf/beam/blob/3bf6c6b3/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java index 6874953..9c79fad 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java @@ -260,9 +260,9 @@ public class GameStats extends LeaderBoard { // Extract username/score pairs from the event stream PCollection<KV<String, Integer>> userEvents = rawEvents.apply("ExtractUserScore", - MapElements.via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore())) - .withOutputType( - TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))); + MapElements + .into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers())) + .via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore()))); // Calculate the total score per user over fixed windows, and // cumulative updates for late data. http://git-wip-us.apache.org/repos/asf/beam/blob/3bf6c6b3/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java index 7dd5a8e..b4b023f 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java @@ -165,9 +165,8 @@ public class UserScore { return gameInfo .apply(MapElements - .via((GameActionInfo gInfo) -> KV.of(gInfo.getKey(field), gInfo.getScore())) - .withOutputType( - TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))) + .into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers())) + .via((GameActionInfo gInfo) -> KV.of(gInfo.getKey(field), gInfo.getScore()))) .apply(Sum.<String>integersPerKey()); } } http://git-wip-us.apache.org/repos/asf/beam/blob/3bf6c6b3/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java index c2f3efe..911ccf6 100644 --- a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java +++ b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java @@ -68,8 +68,8 @@ public class MinimalWordCountJava8Test implements Serializable { .apply(Filter.by((String word) -> !word.isEmpty())) .apply(Count.<String>perElement()) .apply(MapElements - .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()) - .withOutputType(TypeDescriptors.strings())) + .into(TypeDescriptors.strings()) + .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue())) .apply(TextIO.Write.to("gs://your-output-bucket/and-output-prefix")); } http://git-wip-us.apache.org/repos/asf/beam/blob/3bf6c6b3/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java index 40bbfdb..409fc92 100644 --- a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java +++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java @@ -101,9 +101,8 @@ public class HourlyTeamScoreTest implements Serializable { -> gInfo.getTimestamp() > startMinTimestamp.getMillis())) // run a map to access the fields in the result. .apply(MapElements - .via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore())) - .withOutputType( - TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))); + .into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers())) + .via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore()))); PAssert.that(output).containsInAnyOrder(FILTERED_EVENTS); http://git-wip-us.apache.org/repos/asf/beam/blob/3bf6c6b3/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java index f0c28ab..2eb63aa 100644 --- a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java +++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java @@ -143,9 +143,9 @@ public class UserScoreTest implements Serializable { PCollection<KV<String, Integer>> extract = input .apply(ParDo.of(new ParseEventFn())) .apply( - MapElements.via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore())) - .withOutputType( - TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers()))); + MapElements + .into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.integers())) + .via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore()))); PAssert.that(extract).empty(); http://git-wip-us.apache.org/repos/asf/beam/blob/3bf6c6b3/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java index 28c24ad..3b81f4d 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java @@ -192,9 +192,10 @@ public class DirectRunnerTest implements Serializable { TypeDescriptor<byte[]> td = new TypeDescriptor<byte[]>() { }; PCollection<byte[]> foos = - p.apply(Create.of(1, 1, 1, 2, 2, 3)).apply(MapElements.via(getBytes).withOutputType(td)); + p.apply(Create.of(1, 1, 1, 2, 2, 3)) + .apply(MapElements.into(td).via(getBytes)); PCollection<byte[]> msync = - p.apply(Create.of(1, -2, -8, -16)).apply(MapElements.via(getBytes).withOutputType(td)); + p.apply(Create.of(1, -2, -8, -16)).apply(MapElements.into(td).via(getBytes)); PCollection<byte[]> bytes = PCollectionList.of(foos).and(msync).apply(Flatten.<byte[]>pCollections()); PCollection<KV<byte[], Long>> counts = bytes.apply(Count.<byte[]>perElement()); http://git-wip-us.apache.org/repos/asf/beam/blob/3bf6c6b3/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java index 421b2ab..82cf753 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java @@ -17,6 +17,9 @@ */ package org.apache.beam.sdk.transforms; +import static com.google.common.base.Preconditions.checkNotNull; + +import javax.annotation.Nullable; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TypeDescriptor; @@ -26,34 +29,27 @@ import org.apache.beam.sdk.values.TypeDescriptor; */ public class MapElements<InputT, OutputT> extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> { - /** - * For a {@code SerializableFunction<InputT, OutputT>} {@code fn} and output type descriptor, - * returns a {@code PTransform} that takes an input {@code PCollection<InputT>} and returns - * a {@code PCollection<OutputT>} containing {@code fn.apply(v)} for every element {@code v} in - * the input. - * - * <p>Example of use in Java 8: - * <pre>{@code - * PCollection<Integer> wordLengths = words.apply( - * MapElements.via((String word) -> word.length()) - * .withOutputType(new TypeDescriptor<Integer>() {}); - * }</pre> - * - * <p>In Java 7, the overload {@link #via(SimpleFunction)} is more concise as the output type - * descriptor need not be provided. + * Temporarily stores the argument of {@link #into(TypeDescriptor)} until combined with the + * argument of {@link #via(SerializableFunction)} into the fully-specified {@link #fn}. Stays null + * if constructed using {@link #via(SimpleFunction)} directly. */ - public static <InputT, OutputT> MissingOutputTypeDescriptor<InputT, OutputT> - via(SerializableFunction<? super InputT, OutputT> fn) { + @Nullable private final transient TypeDescriptor<OutputT> outputType; - // TypeDescriptor interacts poorly with the wildcards needed to correctly express - // covariance and contravariance in Java, so instead we cast it to an invariant - // function here. - @SuppressWarnings("unchecked") // safe covariant cast - SerializableFunction<InputT, OutputT> simplerFn = - (SerializableFunction<InputT, OutputT>) fn; + /** + * Non-null on a fully specified transform - is null only when constructed using {@link + * #into(TypeDescriptor)}, until the fn is specified using {@link #via(SerializableFunction)}. + */ + @Nullable private final SimpleFunction<InputT, OutputT> fn; + private final DisplayData.ItemSpec<?> fnClassDisplayData; - return new MissingOutputTypeDescriptor<>(simplerFn); + private MapElements( + @Nullable SimpleFunction<InputT, OutputT> fn, + @Nullable TypeDescriptor<OutputT> outputType, + @Nullable Class<?> fnClass) { + this.fn = fn; + this.outputType = outputType; + this.fnClassDisplayData = DisplayData.item("mapFn", fnClass).withLabel("Map Function"); } /** @@ -77,41 +73,46 @@ extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> { */ public static <InputT, OutputT> MapElements<InputT, OutputT> via( final SimpleFunction<InputT, OutputT> fn) { - return new MapElements<>(fn, fn.getClass()); + return new MapElements<>(fn, null, fn.getClass()); } /** - * An intermediate builder for a {@link MapElements} transform. To complete the transform, provide - * an output type descriptor to {@link MissingOutputTypeDescriptor#withOutputType}. See - * {@link #via(SerializableFunction)} for a full example of use. + * Returns a new {@link MapElements} transform with the given type descriptor for the output + * type, but the mapping function yet to be specified using {@link #via(SerializableFunction)}. */ - public static final class MissingOutputTypeDescriptor<InputT, OutputT> { - - private final SerializableFunction<InputT, OutputT> fn; - - private MissingOutputTypeDescriptor(SerializableFunction<InputT, OutputT> fn) { - this.fn = fn; - } - - public MapElements<InputT, OutputT> withOutputType(final TypeDescriptor<OutputT> outputType) { - return new MapElements<>( - SimpleFunction.fromSerializableFunctionWithOutputType(fn, outputType), fn.getClass()); - } - + public static <OutputT> MapElements<?, OutputT> + into(final TypeDescriptor<OutputT> outputType) { + return new MapElements<>(null, outputType, null); } - /////////////////////////////////////////////////////////////////// - - private final SimpleFunction<InputT, OutputT> fn; - private final DisplayData.ItemSpec<?> fnClassDisplayData; - - private MapElements(SimpleFunction<InputT, OutputT> fn, Class<?> fnClass) { - this.fn = fn; - this.fnClassDisplayData = DisplayData.item("mapFn", fnClass).withLabel("Map Function"); + /** + * For a {@code SerializableFunction<InputT, OutputT>} {@code fn} and output type descriptor, + * returns a {@code PTransform} that takes an input {@code PCollection<InputT>} and returns a + * {@code PCollection<OutputT>} containing {@code fn.apply(v)} for every element {@code v} in the + * input. + * + * <p>Example of use in Java 8: + * + * <pre>{@code + * PCollection<Integer> wordLengths = words.apply( + * MapElements.via((String word) -> word.length()) + * .withOutputType(new TypeDescriptor<Integer>() {}); + * }</pre> + * + * <p>In Java 7, the overload {@link #via(SimpleFunction)} is more concise as the output type + * descriptor need not be provided. + */ + public <NewInputT> MapElements<NewInputT, OutputT> via( + SerializableFunction<NewInputT, OutputT> fn) { + return new MapElements<>( + SimpleFunction.fromSerializableFunctionWithOutputType(fn, outputType), + null, + fn.getClass()); } @Override public PCollection<OutputT> expand(PCollection<? extends InputT> input) { + checkNotNull(fn, "Must specify a function on MapElements using .via()"); return input.apply( "Map", ParDo.of( http://git-wip-us.apache.org/repos/asf/beam/blob/3bf6c6b3/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java index 82e856e..7bf94a0 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java @@ -33,6 +33,7 @@ import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeDescriptors; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -152,14 +153,18 @@ public class MapElementsTest implements Serializable { @Test @Category(NeedsRunner.class) public void testMapBasicSerializableFunction() throws Exception { - PCollection<Integer> output = pipeline - .apply(Create.of(1, 2, 3)) - .apply(MapElements.via(new SerializableFunction<Integer, Integer>() { - @Override - public Integer apply(Integer input) { - return -input; - } - }).withOutputType(new TypeDescriptor<Integer>() {})); + PCollection<Integer> output = + pipeline + .apply(Create.of(1, 2, 3)) + .apply( + MapElements.into(TypeDescriptors.integers()) + .via( + new SerializableFunction<Integer, Integer>() { + @Override + public Integer apply(Integer input) { + return -input; + } + })); PAssert.that(output).containsInAnyOrder(-2, -1, -3); pipeline.run(); @@ -210,8 +215,8 @@ public class MapElementsTest implements Serializable { } }; - MapElements<?, ?> serializableMap = MapElements.via(serializableFn) - .withOutputType(TypeDescriptor.of(Integer.class)); + MapElements<?, ?> serializableMap = + MapElements.into(TypeDescriptors.integers()).via(serializableFn); assertThat(DisplayData.from(serializableMap), hasDisplayItem("mapFn", serializableFn.getClass())); } http://git-wip-us.apache.org/repos/asf/beam/blob/3bf6c6b3/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java index c323858..160b231 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BatchLoadBigQuery.java @@ -103,10 +103,9 @@ class BatchLoadBigQuery<T> extends PTransform<PCollection<T>, WriteResult> { if (write.getFormatFunction() == BigQueryIO.IDENTITY_FORMATTER) { inputInGlobalWindow = (PCollection<TableRow>) typedInputInGlobalWindow; } else { - inputInGlobalWindow = typedInputInGlobalWindow - .apply(MapElements.via(write.getFormatFunction()) - .withOutputType(new TypeDescriptor<TableRow>() { - })); + inputInGlobalWindow = + typedInputInGlobalWindow.apply( + MapElements.into(new TypeDescriptor<TableRow>() {}).via(write.getFormatFunction())); } // PCollection of filename, file byte size. http://git-wip-us.apache.org/repos/asf/beam/blob/3bf6c6b3/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java ---------------------------------------------------------------------- diff --git a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java index e0e9d9b4..dbd5ef3 100644 --- a/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java +++ b/sdks/java/java8tests/src/test/java/org/apache/beam/sdk/transforms/MapElementsJava8Test.java @@ -21,7 +21,7 @@ import java.io.Serializable; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeDescriptors; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; @@ -47,8 +47,8 @@ public class MapElementsJava8Test implements Serializable { .apply(Create.of(1, 2, 3)) .apply(MapElements // Note that the type annotation is required. - .via((Integer i) -> i * 2) - .withOutputType(new TypeDescriptor<Integer>() {})); + .into(TypeDescriptors.integers()) + .via((Integer i) -> i * 2)); PAssert.that(output).containsInAnyOrder(6, 2, 4); pipeline.run(); @@ -82,8 +82,8 @@ public class MapElementsJava8Test implements Serializable { .apply(Create.of(1, 2, 3)) .apply(MapElements // Note that the type annotation is required. - .via(new Doubler()::doubleIt) - .withOutputType(new TypeDescriptor<Integer>() {})); + .into(TypeDescriptors.integers()) + .via(new Doubler()::doubleIt)); PAssert.that(output).containsInAnyOrder(6, 2, 4); pipeline.run();
