Remove uses of ParDo.named Also removed a few other uses of .named() methods that were nearby use-sites being cleaned up.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d7f2810a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d7f2810a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d7f2810a Branch: refs/heads/master Commit: d7f2810ad24e805793d6d43a639a48845aac4e48 Parents: 7745b92 Author: Ben Chambers <[email protected]> Authored: Thu Jun 23 09:53:40 2016 -0700 Committer: bchambers <[email protected]> Committed: Thu Jun 23 22:55:43 2016 -0700 ---------------------------------------------------------------------- .../apache/beam/examples/MinimalWordCount.java | 2 +- .../beam/examples/complete/AutoComplete.java | 4 +- .../apache/beam/examples/complete/TfIdf.java | 13 +++--- .../examples/complete/TopWikipediaSessions.java | 7 ++- .../beam/examples/cookbook/FilterExamples.java | 3 +- .../beam/examples/cookbook/JoinExamples.java | 4 +- .../beam/examples/cookbook/TriggerExample.java | 2 +- .../beam/examples/complete/game/GameStats.java | 7 ++- .../examples/complete/game/HourlyTeamScore.java | 4 +- .../examples/complete/game/LeaderBoard.java | 4 +- .../beam/examples/complete/game/UserScore.java | 2 +- .../complete/game/utils/WriteToBigQuery.java | 2 +- .../game/utils/WriteWindowedToBigQuery.java | 2 +- .../complete/game/HourlyTeamScoreTest.java | 2 +- .../beam/runners/flink/examples/TFIDF.java | 48 ++++++++++---------- .../flink/examples/streaming/AutoComplete.java | 4 +- .../flink/examples/streaming/JoinExamples.java | 4 +- .../beam/runners/dataflow/DataflowRunner.java | 5 +- .../dataflow/internal/AssignWindows.java | 2 +- .../DataflowPipelineTranslatorTest.java | 6 +-- .../sdk/io/BoundedReadFromUnboundedSource.java | 3 +- .../apache/beam/sdk/io/PubsubUnboundedSink.java | 24 +++++----- .../beam/sdk/io/PubsubUnboundedSource.java | 5 +- .../org/apache/beam/sdk/testing/PAssert.java | 6 +-- .../org/apache/beam/sdk/transforms/Combine.java | 48 ++++++++++---------- .../org/apache/beam/sdk/transforms/Count.java | 2 +- .../beam/sdk/transforms/FlatMapElements.java | 2 +- .../org/apache/beam/sdk/transforms/Flatten.java | 2 +- .../org/apache/beam/sdk/transforms/Keys.java | 13 +++--- .../org/apache/beam/sdk/transforms/KvSwap.java | 15 +++--- .../apache/beam/sdk/transforms/MapElements.java | 2 +- .../apache/beam/sdk/transforms/PTransform.java | 7 ++- .../beam/sdk/transforms/RemoveDuplicates.java | 13 +++--- .../org/apache/beam/sdk/transforms/Values.java | 13 +++--- .../apache/beam/sdk/transforms/WithKeys.java | 15 +++--- .../beam/sdk/transforms/WithTimestamps.java | 2 +- .../beam/sdk/transforms/join/CoGroupByKey.java | 10 ++-- .../beam/sdk/transforms/windowing/Window.java | 21 ++++----- .../org/apache/beam/sdk/util/Reshuffle.java | 3 +- .../apache/beam/sdk/util/ValueWithRecordId.java | 19 +++----- .../apache/beam/sdk/transforms/ParDoTest.java | 27 +++-------- .../sdk/transforms/windowing/WindowingTest.java | 8 ++-- .../src/main/java/MinimalWordCount.java | 4 +- 43 files changed, 177 insertions(+), 214 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java index 355a1ff..2c67609 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java @@ -92,7 +92,7 @@ public class MinimalWordCount { // DoFn (defined in-line) on each element that tokenizes the text line into individual words. // The ParDo returns a PCollection<String>, where each element is an individual word in // Shakespeare's collected texts. - .apply(ParDo.named("ExtractWords").of(new DoFn<String, String>() { + .apply("ExtractWords", ParDo.of(new DoFn<String, String>() { @Override public void processElement(ProcessContext c) { for (String word : c.element().split("[^a-zA-Z']+")) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java index c6893f4..2732aa5 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java @@ -130,7 +130,7 @@ public class AutoComplete { .apply(new Count.PerElement<String>()) // Map the KV outputs of Count into our own CompletionCandiate class. - .apply(ParDo.named("CreateCompletionCandidates").of( + .apply("CreateCompletionCandidates", ParDo.of( new DoFn<KV<String, Long>, CompletionCandidate>() { @Override public void processElement(ProcessContext c) { @@ -481,7 +481,7 @@ public class AutoComplete { if (options.getOutputToDatastore()) { toWrite - .apply(ParDo.named("FormatForDatastore").of(new FormatForDatastore(options.getKind()))) + .apply("FormatForDatastore", ParDo.of(new FormatForDatastore(options.getKind()))) .apply(DatastoreIO.writeTo(MoreObjects.firstNonNull( options.getOutputProject(), options.getProject()))); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java index 73f3323..23653d6 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java @@ -225,7 +225,7 @@ public class TfIdf { // Create a collection of pairs mapping a URI to each // of the words in the document associated with that that URI. PCollection<KV<URI, String>> uriToWords = uriToContent - .apply(ParDo.named("SplitWords").of( + .apply("SplitWords", ParDo.of( new DoFn<KV<URI, String>, KV<URI, String>>() { @Override public void processElement(ProcessContext c) { @@ -268,7 +268,7 @@ public class TfIdf { // from URI to (word, count) pairs, to prepare for a join // by the URI key. PCollection<KV<URI, KV<String, Long>>> uriToWordAndCount = uriAndWordToCount - .apply(ParDo.named("ShiftKeys").of( + .apply("ShiftKeys", ParDo.of( new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() { @Override public void processElement(ProcessContext c) { @@ -307,7 +307,7 @@ public class TfIdf { // is simply the number of times that word occurs in the document // divided by the total number of words in the document. PCollection<KV<String, KV<URI, Double>>> wordToUriAndTf = uriToWordAndCountAndTotal - .apply(ParDo.named("ComputeTermFrequencies").of( + .apply("ComputeTermFrequencies", ParDo.of( new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() { @Override public void processElement(ProcessContext c) { @@ -331,8 +331,7 @@ public class TfIdf { // documents is passed as a side input; the same value is // presented to each invocation of the DoFn. PCollection<KV<String, Double>> wordToDf = wordToDocCount - .apply(ParDo - .named("ComputeDocFrequencies") + .apply("ComputeDocFrequencies", ParDo .withSideInputs(totalDocuments) .of(new DoFn<KV<String, Long>, KV<String, Double>>() { @Override @@ -362,7 +361,7 @@ public class TfIdf { // here we use a basic version that is the term frequency // divided by the log of the document frequency. PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf = wordToUriAndTfAndDf - .apply(ParDo.named("ComputeTfIdf").of( + .apply("ComputeTfIdf", ParDo.of( new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() { @Override public void processElement(ProcessContext c) { @@ -402,7 +401,7 @@ public class TfIdf { @Override public PDone apply(PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf) { return wordToUriAndTfIdf - .apply(ParDo.named("Format").of(new DoFn<KV<String, KV<URI, Double>>, String>() { + .apply("Format", ParDo.of(new DoFn<KV<String, KV<URI, Double>>, String>() { @Override public void processElement(ProcessContext c) { c.output(String.format("%s,\t%s,\t%f", http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java index 2c0d0eb..5d95e3f 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java @@ -168,7 +168,7 @@ public class TopWikipediaSessions { return input .apply(ParDo.of(new ExtractUserAndTimestamp())) - .apply(ParDo.named("SampleUsers").of( + .apply("SampleUsers", ParDo.of( new DoFn<String, String>() { @Override public void processElement(ProcessContext c) { @@ -179,10 +179,9 @@ public class TopWikipediaSessions { })) .apply(new ComputeSessions()) - - .apply(ParDo.named("SessionsToStrings").of(new SessionsToStringsDoFn())) + .apply("SessionsToStrings", ParDo.of(new SessionsToStringsDoFn())) .apply(new TopPerMonth()) - .apply(ParDo.named("FormatOutput").of(new FormatOutputDoFn())); + .apply("FormatOutput", ParDo.of(new FormatOutputDoFn())); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java index a669b0c..017be21 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java @@ -189,8 +189,7 @@ public class FilterExamples { // that is computed earlier in pipeline execution. // We'll only output readings with temperatures below this mean. PCollection<TableRow> filteredRows = monthFilteredRows - .apply(ParDo - .named("ParseAndFilter") + .apply("ParseAndFilter", ParDo .withSideInputs(globalMeanTemp) .of(new DoFn<TableRow, TableRow>() { @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java index e8f1f01..3c26123 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java @@ -98,7 +98,7 @@ public class JoinExamples { // Process the CoGbkResult elements generated by the CoGroupByKey transform. // country code 'key' -> string of <event info>, <country name> PCollection<KV<String, String>> finalResultCollection = - kvpCollection.apply(ParDo.named("Process").of( + kvpCollection.apply("Process", ParDo.of( new DoFn<KV<String, CoGbkResult>, KV<String, String>>() { @Override public void processElement(ProcessContext c) { @@ -116,7 +116,7 @@ public class JoinExamples { // write to GCS PCollection<String> formattedResults = finalResultCollection - .apply(ParDo.named("Format").of(new DoFn<KV<String, String>, String>() { + .apply("Format", ParDo.of(new DoFn<KV<String, String>, String>() { @Override public void processElement(ProcessContext c) { String outputstring = "Country code: " + c.element().getKey() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java index c614550..ab1fb66 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java @@ -494,7 +494,7 @@ public class TriggerExample { Pipeline injectorPipeline = Pipeline.create(copiedOptions); injectorPipeline .apply(TextIO.Read.named("ReadMyFile").from(options.getInput())) - .apply(ParDo.named("InsertRandomDelays").of(new InsertDelays())) + .apply("InsertRandomDelays", ParDo.of(new InsertDelays())) .apply(IntraBundleParallelization.of(PubsubFileInjector .withTimestampLabelKey(PUBSUB_TIMESTAMP_LABEL_KEY) .publish(options.getPubsubTopic())) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java index c8bcc8c..ad8b49e 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java @@ -123,8 +123,7 @@ public class GameStats extends LeaderBoard { // Filter the user sums using the global mean. PCollection<KV<String, Integer>> filtered = sumScores - .apply(ParDo - .named("ProcessAndFilter") + .apply("ProcessAndFilter", ParDo // use the derived mean total score as a side input .withSideInputs(globalMeanScore) .of(new DoFn<KV<String, Integer>, KV<String, Integer>>() { @@ -249,7 +248,7 @@ public class GameStats extends LeaderBoard { // Read Events from Pub/Sub using custom timestamps PCollection<GameActionInfo> rawEvents = pipeline .apply(PubsubIO.Read.timestampLabel(TIMESTAMP_ATTRIBUTE).topic(options.getTopic())) - .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn())); + .apply("ParseGameEvent", ParDo.of(new ParseEventFn())); // Extract username/score pairs from the event stream PCollection<KV<String, Integer>> userEvents = @@ -284,7 +283,7 @@ public class GameStats extends LeaderBoard { Duration.standardMinutes(options.getFixedWindowDuration()))) ) // Filter out the detected spammer users, using the side input derived above. - .apply(ParDo.named("FilterOutSpammers") + .apply("FilterOutSpammers", ParDo .withSideInputs(spammersView) .of(new DoFn<GameActionInfo, GameActionInfo>() { @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java index c5c2fb5..7a808ac 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java @@ -109,9 +109,11 @@ public class HourlyTeamScore extends UserScore { String getStopMin(); void setStopMin(String value); + @Override @Description("The BigQuery table name. Should not already exist.") @Default.String("hourly_team_score") String getTableName(); + @Override void setTableName(String value); } @@ -155,7 +157,7 @@ public class HourlyTeamScore extends UserScore { // Read 'gaming' events from a text file. pipeline.apply(TextIO.Read.from(options.getInput())) // Parse the incoming data. - .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn())) + .apply("ParseGameEvent", ParDo.of(new ParseEventFn())) // Filter out data before and after the given times so that it is not included // in the calculations. As we collect data in batches (say, by day), the batch for the day http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java index 12d2729..2c608aa 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java @@ -119,9 +119,11 @@ public class LeaderBoard extends HourlyTeamScore { Integer getAllowedLateness(); void setAllowedLateness(Integer value); + @Override @Description("Prefix used for the BigQuery table names") @Default.String("leaderboard") String getTableName(); + @Override void setTableName(String value); } @@ -183,7 +185,7 @@ public class LeaderBoard extends HourlyTeamScore { // data elements, and parse the data. PCollection<GameActionInfo> gameEvents = pipeline .apply(PubsubIO.Read.timestampLabel(TIMESTAMP_ATTRIBUTE).topic(options.getTopic())) - .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn())); + .apply("ParseGameEvent", ParDo.of(new ParseEventFn())); // [START DocInclude_WindowAndTrigger] // Extract team/score pairs from the event stream, using hour-long windows by default. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java index 97b6929..28614cb 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java @@ -226,7 +226,7 @@ public class UserScore { // Read events from a text file and parse them. pipeline.apply(TextIO.Read.from(options.getInput())) - .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn())) + .apply("ParseGameEvent", ParDo.of(new ParseEventFn())) // Extract and sum username/score pairs from the event data. .apply("ExtractUserScore", new ExtractAndSumScore("user")) .apply("WriteUserScoreSums", http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java index 5897e44..5b472d7 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteToBigQuery.java @@ -115,7 +115,7 @@ public class WriteToBigQuery<T> @Override public PDone apply(PCollection<T> teamAndScore) { return teamAndScore - .apply(ParDo.named("ConvertToRow").of(new BuildRowFn())) + .apply("ConvertToRow", ParDo.of(new BuildRowFn())) .apply(BigQueryIO.Write .to(getTable(teamAndScore.getPipeline(), tableName)) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java index 27697db..b1ccaed 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/utils/WriteWindowedToBigQuery.java @@ -66,7 +66,7 @@ public class WriteWindowedToBigQuery<T> @Override public PDone apply(PCollection<T> teamAndScore) { return teamAndScore - .apply(ParDo.named("ConvertToRow").of(new BuildRowFn())) + .apply("ConvertToRow", ParDo.of(new BuildRowFn())) .apply(BigQueryIO.Write .to(getTable(teamAndScore.getPipeline(), tableName)) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java index 4254902..b917b4c 100644 --- a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java +++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/HourlyTeamScoreTest.java @@ -94,7 +94,7 @@ public class HourlyTeamScoreTest implements Serializable { PCollection<String> input = p.apply(Create.of(GAME_EVENTS).withCoder(StringUtf8Coder.of())); PCollection<KV<String, Integer>> output = input - .apply(ParDo.named("ParseGameEvent").of(new ParseEventFn())) + .apply("ParseGameEvent", ParDo.of(new ParseEventFn())) .apply("FilterStartTime", Filter.by( (GameActionInfo gInfo) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java index af920aa..084ac7c 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java @@ -231,26 +231,25 @@ public class TFIDF { // Create a collection of pairs mapping a URI to each // of the words in the document associated with that that URI. PCollection<KV<URI, String>> uriToWords = uriToContent - .apply(ParDo.named("SplitWords").of( - new DoFn<KV<URI, String>, KV<URI, String>>() { - private static final long serialVersionUID = 0; + .apply("SplitWords", ParDo.of(new DoFn<KV<URI, String>, KV<URI, String>>() { + private static final long serialVersionUID = 0; - @Override - public void processElement(ProcessContext c) { - URI uri = c.element().getKey(); - String line = c.element().getValue(); - for (String word : line.split("\\W+")) { - // Log INFO messages when the word âloveâ is found. - if (word.toLowerCase().equals("love")) { - LOG.info("Found {}", word.toLowerCase()); - } - - if (!word.isEmpty()) { - c.output(KV.of(uri, word.toLowerCase())); - } - } + @Override + public void processElement(ProcessContext c) { + URI uri = c.element().getKey(); + String line = c.element().getValue(); + for (String word : line.split("\\W+")) { + // Log INFO messages when the word âloveâ is found. + if (word.toLowerCase().equals("love")) { + LOG.info("Found {}", word.toLowerCase()); } - })); + + if (!word.isEmpty()) { + c.output(KV.of(uri, word.toLowerCase())); + } + } + } + })); // Compute a mapping from each word to the total // number of documents in which it appears. @@ -276,7 +275,7 @@ public class TFIDF { // from URI to (word, count) pairs, to prepare for a join // by the URI key. PCollection<KV<URI, KV<String, Long>>> uriToWordAndCount = uriAndWordToCount - .apply(ParDo.named("ShiftKeys").of( + .apply("ShiftKeys", ParDo.of( new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() { private static final long serialVersionUID = 0; @@ -317,7 +316,7 @@ public class TFIDF { // is simply the number of times that word occurs in the document // divided by the total number of words in the document. PCollection<KV<String, KV<URI, Double>>> wordToUriAndTf = uriToWordAndCountAndTotal - .apply(ParDo.named("ComputeTermFrequencies").of( + .apply("ComputeTermFrequencies", ParDo.of( new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() { private static final long serialVersionUID = 0; @@ -343,8 +342,7 @@ public class TFIDF { // documents is passed as a side input; the same value is // presented to each invocation of the DoFn. PCollection<KV<String, Double>> wordToDf = wordToDocCount - .apply(ParDo - .named("ComputeDocFrequencies") + .apply("ComputeDocFrequencies", ParDo .withSideInputs(totalDocuments) .of(new DoFn<KV<String, Long>, KV<String, Double>>() { private static final long serialVersionUID = 0; @@ -377,9 +375,9 @@ public class TFIDF { // divided by the log of the document frequency. return wordToUriAndTfAndDf - .apply(ParDo.named("ComputeTfIdf").of( + .apply("ComputeTfIdf", ParDo.of( new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() { - private static final long serialVersionUID1 = 0; + private static final long serialVersionUID = 0; @Override public void processElement(ProcessContext c) { @@ -419,7 +417,7 @@ public class TFIDF { @Override public PDone apply(PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf) { return wordToUriAndTfIdf - .apply(ParDo.named("Format").of(new DoFn<KV<String, KV<URI, Double>>, String>() { + .apply("Format", ParDo.of(new DoFn<KV<String, KV<URI, Double>>, String>() { private static final long serialVersionUID = 0; @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java index 9299955..ed11781 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java @@ -92,7 +92,7 @@ public class AutoComplete { .apply(new Count.PerElement<String>()) // Map the KV outputs of Count into our own CompletionCandiate class. - .apply(ParDo.named("CreateCompletionCandidates").of( + .apply("CreateCompletionCandidates", ParDo.of( new DoFn<KV<String, Long>, CompletionCandidate>() { private static final long serialVersionUID = 0; @@ -395,7 +395,7 @@ public class AutoComplete { .apply(ComputeTopCompletions.top(10, options.getRecursive())); toWrite - .apply(ParDo.named("FormatForPerTaskFile").of(new FormatForPerTaskLocalFile())) + .apply("FormatForPerTaskFile", ParDo.of(new FormatForPerTaskLocalFile())) .apply(TextIO.Write.to("./outputAutoComplete.txt")); p.run(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java index b447a20..0828ddc 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java @@ -77,7 +77,7 @@ public class JoinExamples { // Process the CoGbkResult elements generated by the CoGroupByKey transform. // country code 'key' -> string of <event info>, <country name> PCollection<KV<String, String>> finalResultCollection = - kvpCollection.apply(ParDo.named("Process").of( + kvpCollection.apply("Process", ParDo.of( new DoFn<KV<String, CoGbkResult>, KV<String, String>>() { private static final long serialVersionUID = 0; @@ -100,7 +100,7 @@ public class JoinExamples { })); return finalResultCollection - .apply(ParDo.named("Format").of(new DoFn<KV<String, String>, String>() { + .apply("Format", ParDo.of(new DoFn<KV<String, String>, String>() { private static final long serialVersionUID = 0; @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 5818ba5..d47d285 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -179,6 +179,7 @@ import java.util.Random; import java.util.Set; import java.util.SortedSet; import java.util.TreeSet; + import javax.annotation.Nullable; /** @@ -2540,7 +2541,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { .apply(new Deduplicate<T>()); } else { return Pipeline.applyTransform(input, new ReadWithIds<T>(source)) - .apply(ValueWithRecordId.<T>stripIds()); + .apply("StripIds", ParDo.of(new ValueWithRecordId.StripIdsDoFn<T>())); } } @@ -2613,7 +2614,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { // Reshuffle will dedup based on ids in ValueWithRecordId by passing the data through // WindmillSink. .apply(Reshuffle.<Integer, ValueWithRecordId<T>>of()) - .apply(ParDo.named("StripIds").of( + .apply("StripIds", ParDo.of( new DoFn<KV<Integer, ValueWithRecordId<T>>, T>() { @Override public void processElement(ProcessContext c) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java index 1b18c2a..5f808a5 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/AssignWindows.java @@ -63,7 +63,7 @@ public class AssignWindows<T> extends PTransform<PCollection<T>, PCollection<T>> } else { // If the windowFn didn't change, we just run a pass-through transform and then set the // new windowing strategy. - return input.apply(ParDo.named("Identity").of(new DoFn<T, T>() { + return input.apply("Identity", ParDo.of(new DoFn<T, T>() { @Override public void processElement(DoFn<T, T>.ProcessContext c) throws Exception { c.output(c.element()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 261ba99..c3a6a11 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -513,9 +513,9 @@ public class DataflowPipelineTranslatorTest implements Serializable { DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options); Pipeline pipeline = Pipeline.create(options); String stepName = "DoFn1"; - pipeline.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/in")) - .apply(ParDo.of(new NoOpFn()).named(stepName)) - .apply(TextIO.Write.named("WriteMyFile").to("gs://bucket/out")); + pipeline.apply("ReadMyFile", TextIO.Read.from("gs://bucket/in")) + .apply(stepName, ParDo.of(new NoOpFn())) + .apply("WriteMyFile", TextIO.Write.to("gs://bucket/out")); Job job = translator .translate( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java index ba13f9d..cfdd581 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java @@ -23,6 +23,7 @@ import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.RemoveDuplicates; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; @@ -96,7 +97,7 @@ class BoundedReadFromUnboundedSource<T> extends PTransform<PInput, PCollection<T } })); } - return read.apply(ValueWithRecordId.<T>stripIds()); + return read.apply("StripIds", ParDo.of(new ValueWithRecordId.StripIdsDoFn<T>())); } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java index a165c91..78758a2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSink.java @@ -420,22 +420,20 @@ public class PubsubUnboundedSink<T> extends PTransform<PCollection<T>, PDone> { @Override public PDone apply(PCollection<T> input) { - input.apply( - Window.named("PubsubUnboundedSink.Window") - .<T>into(new GlobalWindows()) - .triggering( - Repeatedly.forever( - AfterFirst.of(AfterPane.elementCountAtLeast(publishBatchSize), - AfterProcessingTime.pastFirstElementInPane() - .plusDelayOf(maxLatency)))) + input.apply("PubsubUnboundedSink.Window", Window.<T>into(new GlobalWindows()) + .triggering( + Repeatedly.forever( + AfterFirst.of(AfterPane.elementCountAtLeast(publishBatchSize), + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(maxLatency)))) .discardingFiredPanes()) - .apply(ParDo.named("PubsubUnboundedSink.Shard") - .of(new ShardFn<T>(elementCoder, numShards, recordIdMethod))) + .apply("PubsubUnboundedSink.Shard", + ParDo.of(new ShardFn<T>(elementCoder, numShards, recordIdMethod))) .setCoder(KvCoder.of(VarIntCoder.of(), CODER)) .apply(GroupByKey.<Integer, OutgoingMessage>create()) - .apply(ParDo.named("PubsubUnboundedSink.Writer") - .of(new WriterFn(pubsubFactory, topic, timestampLabel, idLabel, - publishBatchSize, publishBatchBytes))); + .apply("PubsubUnboundedSink.Writer", + ParDo.of(new WriterFn(pubsubFactory, topic, timestampLabel, idLabel, + publishBatchSize, publishBatchBytes))); return PDone.in(input.getPipeline()); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java index e7634ec..07d355e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubUnboundedSource.java @@ -1296,8 +1296,7 @@ public class PubsubUnboundedSource<T> extends PTransform<PBegin, PCollection<T>> return input.getPipeline().begin() .apply(Read.from(new PubsubSource<T>(this))) - .apply(ParDo.named("PubsubUnboundedSource.Stats") - .of(new StatsFn<T>(pubsubFactory, subscription, - timestampLabel, idLabel))); + .apply("PubsubUnboundedSource.Stats", + ParDo.of(new StatsFn<T>(pubsubFactory, subscription, timestampLabel, idLabel))); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java index a29a56d..1a3d85d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java @@ -718,10 +718,8 @@ public class PAssert { input .apply(Create.of(0).withCoder(VarIntCoder.of())) - .apply( - ParDo.named("RunChecks") - .withSideInputs(actual) - .of(new SideInputCheckerDoFn<>(checkerFn, actual))); + .apply("RunChecks", + ParDo.withSideInputs(actual).of(new SideInputCheckerDoFn<>(checkerFn, actual))); return PDone.in(input.getPipeline()); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java index 20c1242..7871672 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java @@ -28,6 +28,7 @@ import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StandardCoder; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn; import org.apache.beam.sdk.transforms.CombineFnBase.AbstractGlobalCombineFn; import org.apache.beam.sdk.transforms.CombineFnBase.AbstractPerKeyCombineFn; import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn; @@ -1400,7 +1401,7 @@ public class Combine { final OutputT defaultValue = fn.defaultValue(); PCollection<OutputT> defaultIfEmpty = maybeEmpty.getPipeline() .apply("CreateVoid", Create.of((Void) null).withCoder(VoidCoder.of())) - .apply(ParDo.named("ProduceDefault").withSideInputs(maybeEmptyView).of( + .apply("ProduceDefault", ParDo.withSideInputs(maybeEmptyView).of( new DoFn<Void, OutputT>() { @Override public void processElement(DoFn<Void, OutputT>.ProcessContext c) { @@ -2024,28 +2025,27 @@ public class Combine { // augmenting the hot keys with a nonce. final TupleTag<KV<KV<K, Integer>, InputT>> hot = new TupleTag<>(); final TupleTag<KV<K, InputT>> cold = new TupleTag<>(); - PCollectionTuple split = input.apply( - ParDo.named("AddNonce").of( - new DoFn<KV<K, InputT>, KV<K, InputT>>() { - transient int counter; - @Override - public void startBundle(Context c) { - counter = ThreadLocalRandom.current().nextInt( - Integer.MAX_VALUE); - } + PCollectionTuple split = input.apply("AddNonce", ParDo.of( + new DoFn<KV<K, InputT>, KV<K, InputT>>() { + transient int counter; + @Override + public void startBundle(Context c) { + counter = ThreadLocalRandom.current().nextInt( + Integer.MAX_VALUE); + } - @Override - public void processElement(ProcessContext c) { - KV<K, InputT> kv = c.element(); - int spread = Math.max(1, hotKeyFanout.apply(kv.getKey())); - if (spread <= 1) { - c.output(kv); - } else { - int nonce = counter++ % spread; - c.sideOutput(hot, KV.of(KV.of(kv.getKey(), nonce), kv.getValue())); - } - } - }) + @Override + public void processElement(ProcessContext c) { + KV<K, InputT> kv = c.element(); + int spread = Math.max(1, hotKeyFanout.apply(kv.getKey())); + if (spread <= 1) { + c.output(kv); + } else { + int nonce = counter++ % spread; + c.sideOutput(hot, KV.of(KV.of(kv.getKey(), nonce), kv.getValue())); + } + } + }) .withOutputTags(cold, TupleTagList.of(hot))); // The first level of combine should never use accumulating mode. @@ -2063,7 +2063,7 @@ public class Combine { inputCoder.getValueCoder())) .setWindowingStrategyInternal(preCombineStrategy) .apply("PreCombineHot", Combine.perKey(hotPreCombine)) - .apply(ParDo.named("StripNonce").of( + .apply("StripNonce", ParDo.of( new DoFn<KV<KV<K, Integer>, AccumT>, KV<K, InputOrAccum<InputT, AccumT>>>() { @Override @@ -2079,7 +2079,7 @@ public class Combine { PCollection<KV<K, InputOrAccum<InputT, AccumT>>> preprocessedCold = split .get(cold) .setCoder(inputCoder) - .apply(ParDo.named("PrepareCold").of( + .apply("PrepareCold", ParDo.of( new DoFn<KV<K, InputT>, KV<K, InputOrAccum<InputT, AccumT>>>() { @Override public void processElement(ProcessContext c) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java index 3bf264e..3a0fb5d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java @@ -107,7 +107,7 @@ public class Count { public PCollection<KV<T, Long>> apply(PCollection<T> input) { return input - .apply(ParDo.named("Init").of(new DoFn<T, KV<T, Void>>() { + .apply("Init", ParDo.of(new DoFn<T, KV<T, Void>>() { @Override public void processElement(ProcessContext c) { c.output(KV.of(c.element(), (Void) null)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java index 932643f..4f270a7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java @@ -133,7 +133,7 @@ extends PTransform<PCollection<InputT>, PCollection<OutputT>> { @Override public PCollection<OutputT> apply(PCollection<InputT> input) { - return input.apply(ParDo.named("Map").of(new DoFn<InputT, OutputT>() { + return input.apply("Map", ParDo.of(new DoFn<InputT, OutputT>() { private static final long serialVersionUID = 0L; @Override public void processElement(ProcessContext c) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java index 93917f3..0b83fb6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java @@ -173,7 +173,7 @@ public class Flatten { @SuppressWarnings("unchecked") Coder<T> elemCoder = ((IterableLikeCoder<T, ?>) inCoder).getElemCoder(); - return in.apply(ParDo.named("FlattenIterables").of( + return in.apply("FlattenIterables", ParDo.of( new DoFn<Iterable<T>, T>() { @Override public void processElement(ProcessContext c) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java index a1e5e0a..636e306 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java @@ -58,12 +58,11 @@ public class Keys<K> extends PTransform<PCollection<? extends KV<K, ?>>, @Override public PCollection<K> apply(PCollection<? extends KV<K, ?>> in) { return - in.apply(ParDo.named("Keys") - .of(new DoFn<KV<K, ?>, K>() { - @Override - public void processElement(ProcessContext c) { - c.output(c.element().getKey()); - } - })); + in.apply("Keys", ParDo.of(new DoFn<KV<K, ?>, K>() { + @Override + public void processElement(ProcessContext c) { + c.output(c.element().getKey()); + } + })); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java index 2dfc7c1..9597c92 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java @@ -62,13 +62,12 @@ public class KvSwap<K, V> extends PTransform<PCollection<KV<K, V>>, @Override public PCollection<KV<V, K>> apply(PCollection<KV<K, V>> in) { return - in.apply(ParDo.named("KvSwap") - .of(new DoFn<KV<K, V>, KV<V, K>>() { - @Override - public void processElement(ProcessContext c) { - KV<K, V> e = c.element(); - c.output(KV.of(e.getValue(), e.getKey())); - } - })); + in.apply("KvSwap", ParDo.of(new DoFn<KV<K, V>, KV<V, K>>() { + @Override + public void processElement(ProcessContext c) { + KV<K, V> e = c.element(); + c.output(KV.of(e.getValue(), e.getKey())); + } + })); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java index d64bad1..f535111 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java @@ -104,7 +104,7 @@ extends PTransform<PCollection<InputT>, PCollection<OutputT>> { @Override public PCollection<OutputT> apply(PCollection<InputT> input) { - return input.apply(ParDo.named("Map").of(new DoFn<InputT, OutputT>() { + return input.apply("Map", ParDo.of(new DoFn<InputT, OutputT>() { @Override public void processElement(ProcessContext c) { c.output(fn.apply(c.element())); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java index 4bcfb29..a56eefc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java @@ -91,7 +91,7 @@ import java.io.Serializable; * * <pre> {@code * ... - * .apply(ParDo.named("Step1").of(new MyDoFn3())) + * .apply("Step1", ParDo.of(new MyDoFn3())) * ... * } </pre> * @@ -218,9 +218,8 @@ public abstract class PTransform<InputT extends PInput, OutputT extends POutput> // understand why all of its instance state is transient. /** - * The base name of this {@code PTransform}, e.g., from - * {@link ParDo#named(String)}, or from defaults, or {@code null} if not - * yet assigned. + * The base name of this {@code PTransform}, e.g., from defaults, or + * {@code null} if not yet assigned. */ protected final transient String name; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java index 84b1f80..b82744d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java @@ -85,13 +85,12 @@ public class RemoveDuplicates<T> extends PTransform<PCollection<T>, @Override public PCollection<T> apply(PCollection<T> in) { return in - .apply(ParDo.named("CreateIndex") - .of(new DoFn<T, KV<T, Void>>() { - @Override - public void processElement(ProcessContext c) { - c.output(KV.of(c.element(), (Void) null)); - } - })) + .apply("CreateIndex", ParDo.of(new DoFn<T, KV<T, Void>>() { + @Override + public void processElement(ProcessContext c) { + c.output(KV.of(c.element(), (Void) null)); + } + })) .apply(Combine.<T, Void>perKey( new SerializableFunction<Iterable<Void>, Void>() { @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java index 0b533b8..b3d38b9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java @@ -58,12 +58,11 @@ public class Values<V> extends PTransform<PCollection<? extends KV<?, V>>, @Override public PCollection<V> apply(PCollection<? extends KV<?, V>> in) { return - in.apply(ParDo.named("Values") - .of(new DoFn<KV<?, V>, V>() { - @Override - public void processElement(ProcessContext c) { - c.output(c.element().getValue()); - } - })); + in.apply("Values", ParDo.of(new DoFn<KV<?, V>, V>() { + @Override + public void processElement(ProcessContext c) { + c.output(c.element().getValue()); + } + })); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java index 198e7cb..25116d8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java @@ -113,14 +113,13 @@ public class WithKeys<K, V> extends PTransform<PCollection<V>, @Override public PCollection<KV<K, V>> apply(PCollection<V> in) { PCollection<KV<K, V>> result = - in.apply(ParDo.named("AddKeys") - .of(new DoFn<V, KV<K, V>>() { - @Override - public void processElement(ProcessContext c) { - c.output(KV.of(fn.apply(c.element()), - c.element())); - } - })); + in.apply("AddKeys", ParDo.of(new DoFn<V, KV<K, V>>() { + @Override + public void processElement(ProcessContext c) { + c.output(KV.of(fn.apply(c.element()), + c.element())); + } + })); try { Coder<K> keyCoder; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java index eae8de5..ef4b269 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java @@ -101,7 +101,7 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T> @Override public PCollection<T> apply(PCollection<T> input) { return input - .apply(ParDo.named("AddTimestamps").of(new AddTimestampsDoFn<T>(fn, allowedTimestampSkew))) + .apply("AddTimestamps", ParDo.of(new AddTimestampsDoFn<T>(fn, allowedTimestampSkew))) .setTypeDescriptorInternal(input.getTypeDescriptor()); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java index d7ac5e4..ba4a4a7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java @@ -128,9 +128,8 @@ public class CoGroupByKey<K> extends flattenedTable.apply(GroupByKey.<K, RawUnionValue>create()); CoGbkResultSchema tupleTags = input.getCoGbkResultSchema(); - PCollection<KV<K, CoGbkResult>> result = groupedTable.apply( - ParDo.of(new ConstructCoGbkResultFn<K>(tupleTags)) - .named("ConstructCoGbkResultFn")); + PCollection<KV<K, CoGbkResult>> result = groupedTable.apply("ConstructCoGbkResultFn", + ParDo.of(new ConstructCoGbkResultFn<K>(tupleTags))); result.setCoder(KvCoder.of(keyCoder, CoGbkResultCoder.of(tupleTags, unionCoder))); @@ -163,9 +162,8 @@ public class CoGroupByKey<K> extends PCollection<KV<K, V>> pCollection, KvCoder<K, RawUnionValue> unionTableEncoder) { - return pCollection.apply(ParDo.of( - new ConstructUnionTableFn<K, V>(index)).named("MakeUnionTable" + index)) - .setCoder(unionTableEncoder); + return pCollection.apply("MakeUnionTable" + index, + ParDo.of(new ConstructUnionTableFn<K, V>(index))).setCoder(unionTableEncoder); } /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java index 324b4d5..7d790d4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java @@ -645,16 +645,6 @@ public class Window { } } - ///////////////////////////////////////////////////////////////////////////// - - private static <T> PTransform<PCollection<? extends T>, PCollection<T>> identity() { - return ParDo.named("Identity").of(new DoFn<T, T>() { - @Override public void processElement(ProcessContext c) { - c.output(c.element()); - } - }); - } - /** * Creates a {@code Window} {@code PTransform} that does not change assigned * windows, but will cause windows to be merged again as part of the next @@ -675,7 +665,16 @@ public class Window { WindowingStrategy<?, ?> outputWindowingStrategy = getOutputWindowing( input.getWindowingStrategy()); - return input.apply(Window.<T>identity()) + return input + // We first apply a (trivial) transform to the input PCollection to produce a new + // PCollection. This ensures that we don't modify the windowing strategy of the input + // which may be used elsewhere. + .apply("Identity", ParDo.of(new DoFn<T, T>() { + @Override public void processElement(ProcessContext c) { + c.output(c.element()); + } + })) + // Then we modify the windowing strategy. .setWindowingStrategyInternal(outputWindowingStrategy); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java index c0d159b..6c58689 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Reshuffle.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; + import org.joda.time.Duration; /** @@ -68,7 +69,7 @@ public class Reshuffle<K, V> extends PTransform<PCollection<KV<K, V>>, PCollecti // Set the windowing strategy directly, so that it doesn't get counted as the user having // set allowed lateness. .setWindowingStrategyInternal(originalStrategy) - .apply(ParDo.named("ExpandIterable").of( + .apply("ExpandIterable", ParDo.of( new DoFn<KV<K, Iterable<V>>, KV<K, V>>() { @Override public void processElement(ProcessContext c) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ValueWithRecordId.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ValueWithRecordId.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ValueWithRecordId.java index 8e4e134..80dfcae 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ValueWithRecordId.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ValueWithRecordId.java @@ -21,9 +21,6 @@ import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.StandardCoder; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PCollection; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; @@ -142,15 +139,11 @@ public class ValueWithRecordId<ValueT> { ByteArrayCoder idCoder; } - public static <T> - PTransform<PCollection<? extends ValueWithRecordId<T>>, PCollection<T>> stripIds() { - return ParDo.named("StripIds") - .of( - new DoFn<ValueWithRecordId<T>, T>() { - @Override - public void processElement(ProcessContext c) { - c.output(c.element().getValue()); - } - }); + /** {@link DoFn} to turn a {@code ValueWithRecordId<T>} back to the value {@code T}. */ + public static class StripIdsDoFn<T> extends DoFn<ValueWithRecordId<T>, T> { + @Override + public void processElement(ProcessContext c) { + c.output(c.element().getValue()); + } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java index 03ecf6f..db32fa6 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java @@ -24,6 +24,7 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.include import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray; import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString; import static org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray; + import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.equalTo; @@ -815,37 +816,22 @@ public class ParDoTest implements Serializable { .setName("MyInput"); { - PCollection<String> output1 = - input - .apply(ParDo.of(new TestDoFn())); + PCollection<String> output1 = input.apply(ParDo.of(new TestDoFn())); assertEquals("ParDo(Test).out", output1.getName()); } { - PCollection<String> output2 = - input - .apply(ParDo.named("MyParDo").of(new TestDoFn())); + PCollection<String> output2 = input.apply("MyParDo", ParDo.of(new TestDoFn())); assertEquals("MyParDo.out", output2.getName()); } { - PCollection<String> output3 = - input - .apply(ParDo.of(new TestDoFn()).named("HerParDo")); - assertEquals("HerParDo.out", output3.getName()); - } - - { - PCollection<String> output4 = - input - .apply(ParDo.of(new TestDoFn()).named("TestDoFn")); + PCollection<String> output4 = input.apply("TestDoFn", ParDo.of(new TestDoFn())); assertEquals("TestDoFn.out", output4.getName()); } { - PCollection<String> output5 = - input - .apply(ParDo.of(new StrangelyNamedDoer())); + PCollection<String> output5 = input.apply(ParDo.of(new StrangelyNamedDoer())); assertEquals("ParDo(StrangelyNamedDoer).out", output5.getName()); } @@ -869,8 +855,7 @@ public class ParDoTest implements Serializable { PCollectionTuple outputs = p .apply(Create.of(Arrays.asList(3, -42, 666))).setName("MyInput") - .apply(ParDo - .named("MyParDo") + .apply("MyParDo", ParDo .of(new TestDoFn( Arrays.<PCollectionView<Integer>>asList(), Arrays.asList(sideOutputTag1, sideOutputTag2, sideOutputTag3))) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java index 5377f23..21f58df 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java @@ -52,7 +52,6 @@ import java.io.Serializable; /** Unit tests for bucketing. */ @RunWith(JUnit4.class) -@SuppressWarnings("unchecked") public class WindowingTest implements Serializable { @Rule public transient TemporaryFolder tmpFolder = new TemporaryFolder(); @@ -73,12 +72,11 @@ public class WindowingTest implements Serializable { } @Override public PCollection<String> apply(PCollection<String> in) { - return in.apply( - Window.named("Window") - .<String>into(windowFn) + return in.apply("Window", + Window.<String>into(windowFn) .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())) .apply(Count.<String>perElement()) - .apply(ParDo.named("FormatCounts").of(new FormatCountsDoFn())) + .apply("FormatCounts", ParDo.of(new FormatCountsDoFn())) .setCoder(StringUtf8Coder.of()); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d7f2810a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java index 98af2e7..be32afa 100644 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/MinimalWordCount.java @@ -82,7 +82,7 @@ public class MinimalWordCount { // DoFn (defined in-line) on each element that tokenizes the text line into individual words. // The ParDo returns a PCollection<String>, where each element is an individual word in // Shakespeare's collected texts. - .apply(ParDo.named("ExtractWords").of(new DoFn<String, String>() { + .apply("ExtractWords", ParDo.of(new DoFn<String, String>() { @Override public void processElement(ProcessContext c) { for (String word : c.element().split("[^a-zA-Z']+")) { @@ -98,7 +98,7 @@ public class MinimalWordCount { .apply(Count.<String>perElement()) // Apply another ParDo transform that formats our PCollection of word counts into a printable // string, suitable for writing to an output file. - .apply(ParDo.named("FormatResults").of(new DoFn<KV<String, Long>, String>() { + .apply("FormatResults", ParDo.of(new DoFn<KV<String, Long>, String>() { @Override public void processElement(ProcessContext c) { c.output(c.element().getKey() + ": " + c.element().getValue());
