Fix compile after ParDo refactor
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bd93c8b5 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bd93c8b5 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bd93c8b5 Branch: refs/heads/master Commit: bd93c8b55ba6f81c87b74364b26d64e0f8c1103f Parents: 7bfc982 Author: Ismaël MejÃa <[email protected]> Authored: Wed Mar 29 10:10:13 2017 +0200 Committer: Ismaël MejÃa <[email protected]> Committed: Wed Aug 23 19:07:27 2017 +0200 ---------------------------------------------------------------------- .../beam/integration/nexmark/NexmarkQuery.java | 14 ++++++------- .../beam/integration/nexmark/NexmarkRunner.java | 3 +-- .../beam/integration/nexmark/NexmarkUtils.java | 16 +++++++------- .../integration/nexmark/queries/Query7.java | 22 ++++++++++---------- 4 files changed, 27 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/bd93c8b5/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java index c268a3b..e1cd493 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkQuery.java @@ -97,7 +97,7 @@ public abstract class NexmarkQuery }; /** Transform to key each person by their id. */ - protected static final ParDo.Bound<Person, KV<Long, Person>> PERSON_BY_ID = + protected static final ParDo.SingleOutput<Person, KV<Long, Person>> PERSON_BY_ID = ParDo.of(new DoFn<Person, KV<Long, Person>>() { @ProcessElement public void processElement(ProcessContext c) { @@ -106,7 +106,7 @@ public abstract class NexmarkQuery }); /** Transform to key each auction by its id. */ - protected static final ParDo.Bound<Auction, KV<Long, Auction>> AUCTION_BY_ID = + protected static final ParDo.SingleOutput<Auction, KV<Long, Auction>> AUCTION_BY_ID = ParDo.of(new DoFn<Auction, KV<Long, Auction>>() { @ProcessElement public void processElement(ProcessContext c) { @@ -115,7 +115,7 @@ public abstract class NexmarkQuery }); /** Transform to key each auction by its seller id. */ - protected static final ParDo.Bound<Auction, KV<Long, Auction>> AUCTION_BY_SELLER = + protected static final ParDo.SingleOutput<Auction, KV<Long, Auction>> AUCTION_BY_SELLER = ParDo.of(new DoFn<Auction, KV<Long, Auction>>() { @ProcessElement public void processElement(ProcessContext c) { @@ -124,7 +124,7 @@ public abstract class NexmarkQuery }); /** Transform to key each bid by it's auction id. */ - protected static final ParDo.Bound<Bid, KV<Long, Bid>> BID_BY_AUCTION = + protected static final ParDo.SingleOutput<Bid, KV<Long, Bid>> BID_BY_AUCTION = ParDo.of(new DoFn<Bid, KV<Long, Bid>>() { @ProcessElement public void processElement(ProcessContext c) { @@ -133,7 +133,7 @@ public abstract class NexmarkQuery }); /** Transform to project the auction id from each bid. */ - protected static final ParDo.Bound<Bid, Long> BID_TO_AUCTION = + protected static final ParDo.SingleOutput<Bid, Long> BID_TO_AUCTION = ParDo.of(new DoFn<Bid, Long>() { @ProcessElement public void processElement(ProcessContext c) { @@ -142,7 +142,7 @@ public abstract class NexmarkQuery }); /** Transform to project the price from each bid. */ - protected static final ParDo.Bound<Bid, Long> BID_TO_PRICE = + protected static final ParDo.SingleOutput<Bid, Long> BID_TO_PRICE = ParDo.of(new DoFn<Bid, Long>() { @ProcessElement public void processElement(ProcessContext c) { @@ -151,7 +151,7 @@ public abstract class NexmarkQuery }); /** Transform to emit each event with the timestamp embedded within it. */ - public static final ParDo.Bound<Event, Event> EVENT_TIMESTAMP_FROM_DATA = + public static final ParDo.SingleOutput<Event, Event> EVENT_TIMESTAMP_FROM_DATA = ParDo.of(new DoFn<Event, Event>() { @ProcessElement public void processElement(ProcessContext c) { http://git-wip-us.apache.org/repos/asf/beam/blob/bd93c8b5/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java index e8d791f..df1000a 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkRunner.java @@ -1073,8 +1073,7 @@ public abstract class NexmarkRunner<OptionT extends NexmarkOptions> { case BIGQUERY: // Multiple BigQuery backends to mimic what most customers do. PCollectionTuple res = formattedResults.apply(queryName + ".Partition", - ParDo.withOutputTags(MAIN, TupleTagList.of(SIDE)) - .of(new PartitionDoFn())); + ParDo.of(new PartitionDoFn()).withOutputTags(MAIN, TupleTagList.of(SIDE))); sinkResultsToBigQuery(res.get(MAIN), now, "main"); sinkResultsToBigQuery(res.get(SIDE), now, "side"); sinkResultsToBigQuery(formattedResults, now, "copy"); http://git-wip-us.apache.org/repos/asf/beam/blob/bd93c8b5/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java index b0421a4..a47ebcc 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/NexmarkUtils.java @@ -417,7 +417,7 @@ public class NexmarkUtils { /** * Return a transform to pass-through events, but count them as they go by. */ - public static ParDo.Bound<Event, Event> snoop(final String name) { + public static ParDo.SingleOutput<Event, Event> snoop(final String name) { return ParDo.of(new DoFn<Event, Event>() { final Aggregator<Long, Long> eventCounter = createAggregator("events", Sum.ofLongs()); @@ -451,7 +451,7 @@ public class NexmarkUtils { /** * Return a transform to count and discard each element. */ - public static <T> ParDo.Bound<T, Void> devNull(String name) { + public static <T> ParDo.SingleOutput<T, Void> devNull(String name) { return ParDo.of(new DoFn<T, Void>() { final Aggregator<Long, Long> discardCounter = createAggregator("discarded", Sum.ofLongs()); @@ -466,7 +466,7 @@ public class NexmarkUtils { /** * Return a transform to log each element, passing it through unchanged. */ - public static <T> ParDo.Bound<T, T> log(final String name) { + public static <T> ParDo.SingleOutput<T, T> log(final String name) { return ParDo.of(new DoFn<T, T>() { @ProcessElement public void processElement(ProcessContext c) { @@ -479,7 +479,7 @@ public class NexmarkUtils { /** * Return a transform to format each element as a string. */ - public static <T> ParDo.Bound<T, String> format(String name) { + public static <T> ParDo.SingleOutput<T, String> format(String name) { return ParDo.of(new DoFn<T, String>() { final Aggregator<Long, Long> recordCounter = createAggregator("records", Sum.ofLongs()); @@ -495,7 +495,7 @@ public class NexmarkUtils { /** * Return a transform to make explicit the timestamp of each element. */ - public static <T> ParDo.Bound<T, TimestampedValue<T>> stamp(String name) { + public static <T> ParDo.SingleOutput<T, TimestampedValue<T>> stamp(String name) { return ParDo.of(new DoFn<T, TimestampedValue<T>>() { @ProcessElement public void processElement(ProcessContext c) { @@ -548,7 +548,7 @@ public class NexmarkUtils { /** * Return a transform to keep the CPU busy for given milliseconds on every record. */ - public static <T> ParDo.Bound<T, T> cpuDelay(String name, final long delayMs) { + public static <T> ParDo.SingleOutput<T, T> cpuDelay(String name, final long delayMs) { return ParDo.of(new DoFn<T, T>() { @ProcessElement public void processElement(ProcessContext c) { @@ -580,7 +580,7 @@ public class NexmarkUtils { /** * Return a transform to write given number of bytes to durable store on every record. */ - public static <T> ParDo.Bound<T, T> diskBusy(String name, final long bytes) { + public static <T> ParDo.SingleOutput<T, T> diskBusy(String name, final long bytes) { return ParDo.of(new DoFn<T, T>() { @ProcessElement public void processElement(ProcessContext c) { @@ -608,7 +608,7 @@ public class NexmarkUtils { /** * Return a transform to cast each element to {@link KnownSize}. */ - private static <T extends KnownSize> ParDo.Bound<T, KnownSize> castToKnownSize() { + private static <T extends KnownSize> ParDo.SingleOutput<T, KnownSize> castToKnownSize() { return ParDo.of(new DoFn<T, KnownSize>() { @ProcessElement public void processElement(ProcessContext c) { http://git-wip-us.apache.org/repos/asf/beam/blob/bd93c8b5/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java ---------------------------------------------------------------------- diff --git a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java index 2835737..f3d1ba4 100644 --- a/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java +++ b/integration/java/nexmark/src/main/java/org/apache/beam/integration/nexmark/queries/Query7.java @@ -70,18 +70,18 @@ public class Query7 extends NexmarkQuery { return slidingBids // Select all bids which have that maximum price (there may be more than one). - .apply(name + ".Select", - ParDo.withSideInputs(maxPriceView) - .of(new DoFn<Bid, Bid>() { - @ProcessElement - public void processElement(ProcessContext c) { - long maxPrice = c.sideInput(maxPriceView); - Bid bid = c.element(); - if (bid.price == maxPrice) { - c.output(bid); - } + .apply(name + ".Select", ParDo + .of(new DoFn<Bid, Bid>() { + @ProcessElement + public void processElement(ProcessContext c) { + long maxPrice = c.sideInput(maxPriceView); + Bid bid = c.element(); + if (bid.price == maxPrice) { + c.output(bid); } - })); + } + }) + .withSideInputs(maxPriceView)); } @Override
