Remove many uses of .named methods Specifically, remove uses of: - Window.named - AvroIO.named - PubSubIO.named - TextIO.named - BigQueryIO.named - Read.named
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/57195358 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/57195358 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/57195358 Branch: refs/heads/master Commit: 57195358592548e6f7e05bc8e4e292b126a726c5 Parents: 4f580f5 Author: Ben Chambers <[email protected]> Authored: Thu Jun 23 22:27:05 2016 -0700 Committer: Ben Chambers <[email protected]> Committed: Sun Jun 26 10:06:35 2016 -0700 ---------------------------------------------------------------------- .../beam/examples/DebuggingWordCount.java | 2 +- .../org/apache/beam/examples/WordCount.java | 4 +- .../apache/beam/examples/complete/TfIdf.java | 3 +- .../examples/complete/TopWikipediaSessions.java | 2 +- .../examples/cookbook/DatastoreWordCount.java | 2 +- .../beam/examples/cookbook/DeDupExample.java | 5 +- .../beam/examples/cookbook/TriggerExample.java | 4 +- .../beam/examples/complete/game/GameStats.java | 28 ++- .../examples/complete/game/HourlyTeamScore.java | 5 +- .../examples/complete/game/LeaderBoard.java | 8 +- .../beam/runners/flink/examples/TFIDF.java | 3 +- .../beam/runners/flink/examples/WordCount.java | 4 +- .../flink/examples/streaming/AutoComplete.java | 9 +- .../flink/examples/streaming/JoinExamples.java | 13 +- .../KafkaWindowedWordCountExample.java | 2 +- .../examples/streaming/WindowedWordCount.java | 3 +- .../beam/runners/dataflow/DataflowRunner.java | 2 +- .../DataflowPipelineTranslatorTest.java | 6 +- .../runners/dataflow/DataflowRunnerTest.java | 18 +- .../beam/runners/spark/SimpleWordCountTest.java | 3 +- .../java/org/apache/beam/sdk/io/BigQueryIO.java | 1 - .../beam/sdk/io/AvroIOGeneratedClassTest.java | 192 +++++-------------- .../java/org/apache/beam/sdk/io/AvroIOTest.java | 5 +- .../org/apache/beam/sdk/io/BigQueryIOTest.java | 82 +++----- .../apache/beam/sdk/io/FileBasedSourceTest.java | 5 +- .../org/apache/beam/sdk/io/PubsubIOTest.java | 4 - .../java/org/apache/beam/sdk/io/TextIOTest.java | 37 +--- .../org/apache/beam/sdk/io/XmlSourceTest.java | 19 +- .../beam/sdk/runners/TransformTreeTest.java | 4 +- .../sdk/transforms/windowing/WindowTest.java | 6 +- .../sdk/transforms/windowing/WindowingTest.java | 2 +- .../src/main/java/DebuggingWordCount.java | 2 +- .../src/main/java/WordCount.java | 4 +- 33 files changed, 158 insertions(+), 331 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java index 85823c2..8d85d44 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java @@ -173,7 +173,7 @@ public class DebuggingWordCount { Pipeline p = Pipeline.create(options); PCollection<KV<String, Long>> filteredWords = - p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile())) + p.apply("ReadLines", TextIO.Read.from(options.getInputFile())) .apply(new WordCount.CountWords()) .apply(ParDo.of(new FilterTextFn(options.getFilterPattern()))); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/examples/java/src/main/java/org/apache/beam/examples/WordCount.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java index cf6c45a..af16c44 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java @@ -205,10 +205,10 @@ public class WordCount { // Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the // static FormatAsTextFn() to the ParDo transform. - p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile())) + p.apply("ReadLines", TextIO.Read.from(options.getInputFile())) .apply(new CountWords()) .apply(MapElements.via(new FormatAsTextFn())) - .apply(TextIO.Write.named("WriteCounts").to(options.getOutput())); + .apply("WriteCounts", TextIO.Write.to(options.getOutput())); p.run(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java index 23653d6..8305314 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java @@ -187,8 +187,7 @@ public class TfIdf { } PCollection<KV<URI, String>> oneUriToLines = pipeline - .apply(TextIO.Read.from(uriString) - .named("TextIO.Read(" + uriString + ")")) + .apply("TextIO.Read(" + uriString + ")", TextIO.Read.from(uriString)) .apply("WithKeys(" + uriString + ")", WithKeys.<URI, String>of(uri)); urisToLines = urisToLines.and(oneUriToLines); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java index 5d95e3f..80b3ade 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java @@ -217,7 +217,7 @@ public class TopWikipediaSessions { .from(options.getInput()) .withCoder(TableRowJsonCoder.of())) .apply(new ComputeTopSessions(samplingThreshold)) - .apply(TextIO.Write.named("Write").withoutSharding().to(options.getOutput())); + .apply("Write", TextIO.Write.withoutSharding().to(options.getOutput())); p.run(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java index 7578d79..b070f94 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java @@ -193,7 +193,7 @@ public class DatastoreWordCount { */ public static void writeDataToDatastore(Options options) { Pipeline p = Pipeline.create(options); - p.apply(TextIO.Read.named("ReadLines").from(options.getInput())) + p.apply("ReadLines", TextIO.Read.from(options.getInput())) .apply(ParDo.of(new CreateEntityFn(options.getNamespace(), options.getKind()))) .apply(DatastoreIO.writeTo(options.getProject())); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java index db65543..d573bcd 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java @@ -89,10 +89,9 @@ public class DeDupExample { Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); Pipeline p = Pipeline.create(options); - p.apply(TextIO.Read.named("ReadLines").from(options.getInput())) + p.apply("ReadLines", TextIO.Read.from(options.getInput())) .apply(RemoveDuplicates.<String>create()) - .apply(TextIO.Write.named("DedupedShakespeare") - .to(options.getOutput())); + .apply("DedupedShakespeare", TextIO.Write.to(options.getOutput())); p.run(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java index ab1fb66..ff4909b 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java @@ -467,7 +467,7 @@ public class TriggerExample { TableReference tableRef = getTableReference(options.getProject(), options.getBigQueryDataset(), options.getBigQueryTable()); - PCollectionList<TableRow> resultList = pipeline.apply(PubsubIO.Read.named("ReadPubsubInput") + PCollectionList<TableRow> resultList = pipeline.apply("ReadPubsubInput", PubsubIO.Read .timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY) .topic(options.getPubsubTopic())) .apply(ParDo.of(new ExtractFlowInfo())) @@ -493,7 +493,7 @@ public class TriggerExample { copiedOptions.setJobName(options.getJobName() + "-injector"); Pipeline injectorPipeline = Pipeline.create(copiedOptions); injectorPipeline - .apply(TextIO.Read.named("ReadMyFile").from(options.getInput())) + .apply("ReadMyFile", TextIO.Read.from(options.getInput())) .apply("InsertRandomDelays", ParDo.of(new InsertDelays())) .apply(IntraBundleParallelization.of(PubsubFileInjector .withTimestampLabelKey(PUBSUB_TIMESTAMP_LABEL_KEY) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java index ad8b49e..b1cb312 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java @@ -260,10 +260,8 @@ public class GameStats extends LeaderBoard { // Calculate the total score per user over fixed windows, and // cumulative updates for late data. final PCollectionView<Map<String, Integer>> spammersView = userEvents - .apply(Window.named("FixedWindowsUser") - .<KV<String, Integer>>into(FixedWindows.of( - Duration.standardMinutes(options.getFixedWindowDuration()))) - ) + .apply("FixedWindowsUser", Window.<KV<String, Integer>>into( + FixedWindows.of(Duration.standardMinutes(options.getFixedWindowDuration())))) // Filter out everyone but those with (SCORE_WEIGHT * avg) clickrate. // These might be robots/spammers. @@ -278,10 +276,8 @@ public class GameStats extends LeaderBoard { // suspected robots-- to filter out scores from those users from the sum. // Write the results to BigQuery. rawEvents - .apply(Window.named("WindowIntoFixedWindows") - .<GameActionInfo>into(FixedWindows.of( - Duration.standardMinutes(options.getFixedWindowDuration()))) - ) + .apply("WindowIntoFixedWindows", Window.<GameActionInfo>into( + FixedWindows.of(Duration.standardMinutes(options.getFixedWindowDuration())))) // Filter out the detected spammer users, using the side input derived above. .apply("FilterOutSpammers", ParDo .withSideInputs(spammersView) @@ -299,8 +295,8 @@ public class GameStats extends LeaderBoard { // [END DocInclude_FilterAndCalc] // Write the result to BigQuery .apply("WriteTeamSums", - new WriteWindowedToBigQuery<KV<String, Integer>>( - options.getTablePrefix() + "_team", configureWindowedWrite())); + new WriteWindowedToBigQuery<KV<String, Integer>>( + options.getTablePrefix() + "_team", configureWindowedWrite())); // [START DocInclude_SessionCalc] @@ -309,10 +305,9 @@ public class GameStats extends LeaderBoard { // This information could help the game designers track the changing user engagement // as their set of games changes. userEvents - .apply(Window.named("WindowIntoSessions") - .<KV<String, Integer>>into( - Sessions.withGapDuration(Duration.standardMinutes(options.getSessionGap()))) - .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow())) + .apply("WindowIntoSessions", Window.<KV<String, Integer>>into( + Sessions.withGapDuration(Duration.standardMinutes(options.getSessionGap()))) + .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow())) // For this use, we care only about the existence of the session, not any particular // information aggregated over it, so the following is an efficient way to do that. .apply(Combine.perKey(x -> 0)) @@ -321,9 +316,8 @@ public class GameStats extends LeaderBoard { // [END DocInclude_SessionCalc] // [START DocInclude_Rewindow] // Re-window to process groups of session sums according to when the sessions complete. - .apply(Window.named("WindowToExtractSessionMean") - .<Integer>into( - FixedWindows.of(Duration.standardMinutes(options.getUserActivityWindowDuration())))) + .apply("WindowToExtractSessionMean", Window.<Integer>into( + FixedWindows.of(Duration.standardMinutes(options.getUserActivityWindowDuration())))) // Find the mean session duration in each window. .apply(Mean.<Integer>globally().withoutDefaults()) // Write this info to a BigQuery table. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java index 7a808ac..e489607 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java @@ -178,9 +178,8 @@ public class HourlyTeamScore extends UserScore { // Add an element timestamp based on the event log, and apply fixed windowing. .apply("AddEventTimestamps", WithTimestamps.of((GameActionInfo i) -> new Instant(i.getTimestamp()))) - .apply(Window.named("FixedWindowsTeam") - .<GameActionInfo>into(FixedWindows.of( - Duration.standardMinutes(options.getWindowDuration())))) + .apply("FixedWindowsTeam", Window.<GameActionInfo>into( + FixedWindows.of(Duration.standardMinutes(options.getWindowDuration())))) // [END DocInclude_HTSAddTsAndWindow] // Extract and sum teamname/score pairs from the event data. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java index 2c608aa..a14d533 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java @@ -190,9 +190,8 @@ public class LeaderBoard extends HourlyTeamScore { // [START DocInclude_WindowAndTrigger] // Extract team/score pairs from the event stream, using hour-long windows by default. gameEvents - .apply(Window.named("LeaderboardTeamFixedWindows") - .<GameActionInfo>into(FixedWindows.of( - Duration.standardMinutes(options.getTeamWindowDuration()))) + .apply("LeaderboardTeamFixedWindows", Window.<GameActionInfo>into( + FixedWindows.of(Duration.standardMinutes(options.getTeamWindowDuration()))) // We will get early (speculative) results as well as cumulative // processing of late data. .triggering( @@ -215,8 +214,7 @@ public class LeaderBoard extends HourlyTeamScore { // Extract user/score pairs from the event stream using processing time, via global windowing. // Get periodic updates on all users' running scores. gameEvents - .apply(Window.named("LeaderboardUserGlobalWindow") - .<GameActionInfo>into(new GlobalWindows()) + .apply("LeaderboardUserGlobalWindow", Window.<GameActionInfo>into(new GlobalWindows()) // Get periodic results every ten minutes. .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(TEN_MINUTES))) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java index 084ac7c..56737a4 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java @@ -191,8 +191,7 @@ public class TFIDF { } PCollection<KV<URI, String>> oneUriToLines = pipeline - .apply(TextIO.Read.from(uriString) - .named("TextIO.Read(" + uriString + ")")) + .apply("TextIO.Read(" + uriString + ")", TextIO.Read.from(uriString)) .apply("WithKeys(" + uriString + ")", WithKeys.<URI, String>of(uri)); urisToLines = urisToLines.and(oneUriToLines); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java index 2817622..2d95c97 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java @@ -109,10 +109,10 @@ public class WordCount { Pipeline p = Pipeline.create(options); - p.apply(TextIO.Read.named("ReadLines").from(options.getInput())) + p.apply("ReadLines", TextIO.Read.from(options.getInput())) .apply(new CountWords()) .apply(MapElements.via(new FormatAsTextFn())) - .apply(TextIO.Write.named("WriteCounts").to(options.getOutput())); + .apply("WriteCounts", TextIO.Write.to(options.getOutput())); p.run(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java index ed11781..c0ff85d 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java @@ -44,7 +44,6 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; @@ -380,14 +379,14 @@ public class AutoComplete { options.setExecutionRetryDelay(3000L); options.setRunner(FlinkRunner.class); - PTransform<? super PBegin, PCollection<String>> readSource = - Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("WordStream"); - WindowFn<Object, ?> windowFn = FixedWindows.of(Duration.standardSeconds(options.getWindowSize())); + + WindowFn<Object, ?> windowFn = + FixedWindows.of(Duration.standardSeconds(options.getWindowSize())); // Create the pipeline. Pipeline p = Pipeline.create(options); PCollection<KV<String, List<CompletionCandidate>>> toWrite = p - .apply(readSource) + .apply("WordStream", Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3))) .apply(ParDo.of(new ExtractWordsFn())) .apply(Window.<String>into(windowFn) .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java index 0828ddc..f456b27 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java @@ -24,7 +24,6 @@ import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.join.CoGbkResult; import org.apache.beam.sdk.transforms.join.CoGroupByKey; @@ -34,7 +33,6 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TupleTag; @@ -135,22 +133,19 @@ public class JoinExamples { options.setExecutionRetryDelay(3000L); options.setRunner(FlinkRunner.class); - PTransform<? super PBegin, PCollection<String>> readSourceA = - Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("FirstStream"); - PTransform<? super PBegin, PCollection<String>> readSourceB = - Read.from(new UnboundedSocketSource<>("localhost", 9998, '\n', 3)).named("SecondStream"); - WindowFn<Object, ?> windowFn = FixedWindows.of(Duration.standardSeconds(options.getWindowSize())); Pipeline p = Pipeline.create(options); // the following two 'applys' create multiple inputs to our pipeline, one for each // of our two input sources. - PCollection<String> streamA = p.apply(readSourceA) + PCollection<String> streamA = p + .apply("FirstStream", Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3))) .apply(Window.<String>into(windowFn) .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) .discardingFiredPanes()); - PCollection<String> streamB = p.apply(readSourceB) + PCollection<String> streamB = p + .apply("SecondStream", Read.from(new UnboundedSocketSource<>("localhost", 9998, '\n', 3))) .apply(Window.<String>into(windowFn) .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) .discardingFiredPanes()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java index b14c5ae..4e81420 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java @@ -132,7 +132,7 @@ public class KafkaWindowedWordCountExample { new SimpleStringSchema(), p); PCollection<String> words = pipeline - .apply(Read.named("StreamingWordCount").from(UnboundedFlinkSource.of(kafkaConsumer))) + .apply("StreamingWordCount", Read.from(UnboundedFlinkSource.of(kafkaConsumer))) .apply(ParDo.of(new ExtractWordsFn())) .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(options.getWindowSize()))) .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java ---------------------------------------------------------------------- diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java index f72b705..1b532a7 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java @@ -119,7 +119,8 @@ public class WindowedWordCount { Pipeline pipeline = Pipeline.create(options); PCollection<String> words = pipeline - .apply(Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("StreamingWordCount")) + .apply("StreamingWordCount", + Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3))) .apply(ParDo.of(new ExtractWordsFn())) .apply(Window.<String>into(SlidingWindows.of(Duration.standardSeconds(options.getWindowSize())) .every(Duration.standardSeconds(options.getSlide()))) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index d47d285..7ff247a 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -2686,7 +2686,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> { try { Coder<T> coder = transform.getDefaultOutputCoder(input); return Pipeline.applyTransform( - input, PubsubIO.Read.named("StartingSignal").subscription("_starting_signal/")) + "StartingSignal", input, PubsubIO.Read.subscription("_starting_signal/")) .apply(ParDo.of(new OutputNullKv())) .apply("GlobalSingleton", Window.<KV<Void, Void>>into(new GlobalWindows()) .triggering(AfterPane.elementCountAtLeast(1)) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index c3a6a11..e04a1fc 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -127,8 +127,8 @@ public class DataflowPipelineTranslatorTest implements Serializable { options.setRunner(DataflowRunner.class); Pipeline p = Pipeline.create(options); - p.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/object")) - .apply(TextIO.Write.named("WriteMyFile").to("gs://bucket/object")); + p.apply("ReadMyFile", TextIO.Read.from("gs://bucket/object")) + .apply("WriteMyFile", TextIO.Write.to("gs://bucket/object")); return p; } @@ -458,7 +458,7 @@ public class DataflowPipelineTranslatorTest implements Serializable { // Create a pipeline that the predefined step will be embedded into Pipeline pipeline = Pipeline.create(options); - pipeline.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/in")) + pipeline.apply("ReadMyFile", TextIO.Read.from("gs://bucket/in")) .apply(ParDo.of(new NoOpFn())) .apply(new EmbeddedTransform(predefinedStep.clone())) .apply(ParDo.of(new NoOpFn())); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index e094d0d..999dc3a 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.dataflow; import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow; + import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -151,8 +152,8 @@ public class DataflowRunnerTest { options.setRunner(DataflowRunner.class); Pipeline p = Pipeline.create(options); - p.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/object")) - .apply(TextIO.Write.named("WriteMyFile").to("gs://bucket/object")); + p.apply("ReadMyFile", TextIO.Read.from("gs://bucket/object")) + .apply("WriteMyFile", TextIO.Write.to("gs://bucket/object")); return p; } @@ -464,7 +465,7 @@ public class DataflowRunnerTest { ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); Pipeline p = buildDataflowPipeline(buildPipelineOptions(jobCaptor)); - p.apply(TextIO.Read.named("ReadMyNonGcsFile").from(tmpFolder.newFile().getPath())); + p.apply("ReadMyNonGcsFile", TextIO.Read.from(tmpFolder.newFile().getPath())); thrown.expectCause(Matchers.allOf( instanceOf(IllegalArgumentException.class), @@ -477,11 +478,11 @@ public class DataflowRunnerTest { @Test public void testNonGcsFilePathInWriteFailure() throws IOException { Pipeline p = buildDataflowPipeline(buildPipelineOptions()); - PCollection<String> pc = p.apply(TextIO.Read.named("ReadMyGcsFile").from("gs://bucket/object")); + PCollection<String> pc = p.apply("ReadMyGcsFile", TextIO.Read.from("gs://bucket/object")); thrown.expect(IllegalArgumentException.class); thrown.expectMessage(containsString("expected a valid 'gs://' path but was given")); - pc.apply(TextIO.Write.named("WriteMyNonGcsFile").to("/tmp/file")); + pc.apply("WriteMyNonGcsFile", TextIO.Write.to("/tmp/file")); } @Test @@ -489,8 +490,7 @@ public class DataflowRunnerTest { ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class); Pipeline p = buildDataflowPipeline(buildPipelineOptions(jobCaptor)); - p.apply(TextIO.Read.named("ReadInvalidGcsFile") - .from("gs://bucket/tmp//file")); + p.apply("ReadInvalidGcsFile", TextIO.Read.from("gs://bucket/tmp//file")); thrown.expectCause(Matchers.allOf( instanceOf(IllegalArgumentException.class), @@ -502,11 +502,11 @@ public class DataflowRunnerTest { @Test public void testMultiSlashGcsFileWritePath() throws IOException { Pipeline p = buildDataflowPipeline(buildPipelineOptions()); - PCollection<String> pc = p.apply(TextIO.Read.named("ReadMyGcsFile").from("gs://bucket/object")); + PCollection<String> pc = p.apply("ReadMyGcsFile", TextIO.Read.from("gs://bucket/object")); thrown.expect(IllegalArgumentException.class); thrown.expectMessage("consecutive slashes"); - pc.apply(TextIO.Write.named("WriteInvalidGcsFile").to("gs://bucket/tmp//file")); + pc.apply("WriteInvalidGcsFile", TextIO.Write.to("gs://bucket/tmp//file")); } @Test http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java index da3fa7a..6a3edd7 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java @@ -89,8 +89,7 @@ public class SimpleWordCountTest { PCollection<String> output = inputWords.apply(new CountWords()); File outputFile = testFolder.newFile(); - output.apply( - TextIO.Write.named("WriteCounts").to(outputFile.getAbsolutePath()).withoutSharding()); + output.apply("WriteCounts", TextIO.Write.to(outputFile.getAbsolutePath()).withoutSharding()); EvaluationResult res = SparkRunner.create().run(p); res.close(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java index 6a36c8d..7cac705 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java @@ -518,7 +518,6 @@ public class BigQueryIO { + " query without a result flattening preference"); } - // Only verify existence/correctness if validation is enabled. if (validate) { // Check for source table/query presence for early failure notification. // Note that a presence check can fail if the table or dataset are created by earlier http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java index da886de..6e26d33 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java @@ -136,6 +136,18 @@ public class AvroIOGeneratedClassTest { return users; } + <T> void runTestRead( + String applyName, AvroIO.Read.Bound<T> read, String expectedName, T[] expectedOutput) + throws Exception { + generateAvroFile(generateAvroObjects()); + + TestPipeline p = TestPipeline.create(); + PCollection<T> output = p.apply(applyName, read); + PAssert.that(output).containsInAnyOrder(expectedOutput); + p.run(); + assertEquals(expectedName, output.getName()); + } + <T> void runTestRead(AvroIO.Read.Bound<T> read, String expectedName, T[] expectedOutput) throws Exception { generateAvroFile(generateAvroObjects()); @@ -158,28 +170,16 @@ public class AvroIOGeneratedClassTest { AvroIO.Read.withSchema(AvroGeneratedUser.class).from(avroFile.getPath()), "AvroIO.Read/Read.out", generateAvroObjects()); - runTestRead( - AvroIO.Read.named("MyRead").from(avroFile.getPath()).withSchema(AvroGeneratedUser.class), + runTestRead("MyRead", + AvroIO.Read.from(avroFile.getPath()).withSchema(AvroGeneratedUser.class), "MyRead/Read.out", generateAvroObjects()); - runTestRead( - AvroIO.Read.named("MyRead").withSchema(AvroGeneratedUser.class).from(avroFile.getPath()), + runTestRead("MyRead", + AvroIO.Read.withSchema(AvroGeneratedUser.class).from(avroFile.getPath()), "MyRead/Read.out", generateAvroObjects()); - runTestRead( - AvroIO.Read.from(avroFile.getPath()).withSchema(AvroGeneratedUser.class).named("HerRead"), - "HerRead/Read.out", - generateAvroObjects()); - runTestRead( - AvroIO.Read.from(avroFile.getPath()).named("HerRead").withSchema(AvroGeneratedUser.class), - "HerRead/Read.out", - generateAvroObjects()); - runTestRead( - AvroIO.Read.withSchema(AvroGeneratedUser.class).named("HerRead").from(avroFile.getPath()), - "HerRead/Read.out", - generateAvroObjects()); - runTestRead( - AvroIO.Read.withSchema(AvroGeneratedUser.class).from(avroFile.getPath()).named("HerRead"), + runTestRead("HerRead", + AvroIO.Read.from(avroFile.getPath()).withSchema(AvroGeneratedUser.class), "HerRead/Read.out", generateAvroObjects()); } @@ -195,28 +195,20 @@ public class AvroIOGeneratedClassTest { AvroIO.Read.withSchema(schema).from(avroFile.getPath()), "AvroIO.Read/Read.out", generateAvroGenericRecords()); - runTestRead( - AvroIO.Read.named("MyRead").from(avroFile.getPath()).withSchema(schema), + runTestRead("MyRead", + AvroIO.Read.from(avroFile.getPath()).withSchema(schema), "MyRead/Read.out", generateAvroGenericRecords()); - runTestRead( - AvroIO.Read.named("MyRead").withSchema(schema).from(avroFile.getPath()), + runTestRead("MyRead", + AvroIO.Read.withSchema(schema).from(avroFile.getPath()), "MyRead/Read.out", generateAvroGenericRecords()); - runTestRead( - AvroIO.Read.from(avroFile.getPath()).withSchema(schema).named("HerRead"), - "HerRead/Read.out", - generateAvroGenericRecords()); - runTestRead( - AvroIO.Read.from(avroFile.getPath()).named("HerRead").withSchema(schema), - "HerRead/Read.out", - generateAvroGenericRecords()); - runTestRead( - AvroIO.Read.withSchema(schema).named("HerRead").from(avroFile.getPath()), + runTestRead("HerRead", + AvroIO.Read.from(avroFile.getPath()).withSchema(schema), "HerRead/Read.out", generateAvroGenericRecords()); - runTestRead( - AvroIO.Read.withSchema(schema).from(avroFile.getPath()).named("HerRead"), + runTestRead("HerRead", + AvroIO.Read.from(avroFile.getPath()).withSchema(schema), "HerRead/Read.out", generateAvroGenericRecords()); } @@ -232,28 +224,12 @@ public class AvroIOGeneratedClassTest { AvroIO.Read.withSchema(schemaString).from(avroFile.getPath()), "AvroIO.Read/Read.out", generateAvroGenericRecords()); - runTestRead( - AvroIO.Read.named("MyRead").from(avroFile.getPath()).withSchema(schemaString), - "MyRead/Read.out", - generateAvroGenericRecords()); - runTestRead( - AvroIO.Read.named("MyRead").withSchema(schemaString).from(avroFile.getPath()), + runTestRead("MyRead", + AvroIO.Read.from(avroFile.getPath()).withSchema(schemaString), "MyRead/Read.out", generateAvroGenericRecords()); - runTestRead( - AvroIO.Read.from(avroFile.getPath()).withSchema(schemaString).named("HerRead"), - "HerRead/Read.out", - generateAvroGenericRecords()); - runTestRead( - AvroIO.Read.from(avroFile.getPath()).named("HerRead").withSchema(schemaString), - "HerRead/Read.out", - generateAvroGenericRecords()); - runTestRead( - AvroIO.Read.withSchema(schemaString).named("HerRead").from(avroFile.getPath()), - "HerRead/Read.out", - generateAvroGenericRecords()); - runTestRead( - AvroIO.Read.withSchema(schemaString).from(avroFile.getPath()).named("HerRead"), + runTestRead("HerRead", + AvroIO.Read.withSchema(schemaString).from(avroFile.getPath()), "HerRead/Read.out", generateAvroGenericRecords()); } @@ -276,106 +252,34 @@ public class AvroIOGeneratedClassTest { @Test @Category(NeedsRunner.class) public void testWriteFromGeneratedClass() throws Exception { - runTestWrite(AvroIO.Write.to(avroFile.getPath()) - .withSchema(AvroGeneratedUser.class), - "AvroIO.Write"); - runTestWrite(AvroIO.Write.withSchema(AvroGeneratedUser.class) - .to(avroFile.getPath()), - "AvroIO.Write"); - runTestWrite(AvroIO.Write.named("MyWrite") - .to(avroFile.getPath()) - .withSchema(AvroGeneratedUser.class), - "MyWrite"); - runTestWrite(AvroIO.Write.named("MyWrite") - .withSchema(AvroGeneratedUser.class) - .to(avroFile.getPath()), - "MyWrite"); - runTestWrite(AvroIO.Write.to(avroFile.getPath()) - .withSchema(AvroGeneratedUser.class) - .named("HerWrite"), - "HerWrite"); - runTestWrite(AvroIO.Write.to(avroFile.getPath()) - .named("HerWrite") - .withSchema(AvroGeneratedUser.class), - "HerWrite"); - runTestWrite(AvroIO.Write.withSchema(AvroGeneratedUser.class) - .named("HerWrite") - .to(avroFile.getPath()), - "HerWrite"); - runTestWrite(AvroIO.Write.withSchema(AvroGeneratedUser.class) - .to(avroFile.getPath()) - .named("HerWrite"), - "HerWrite"); + runTestWrite( + AvroIO.Write.to(avroFile.getPath()).withSchema(AvroGeneratedUser.class), + "AvroIO.Write"); + runTestWrite( + AvroIO.Write.withSchema(AvroGeneratedUser.class).to(avroFile.getPath()), + "AvroIO.Write"); } @Test @Category(NeedsRunner.class) public void testWriteFromSchema() throws Exception { - runTestWrite(AvroIO.Write.to(avroFile.getPath()) - .withSchema(schema), - "AvroIO.Write"); - runTestWrite(AvroIO.Write.withSchema(schema) - .to(avroFile.getPath()), - "AvroIO.Write"); - runTestWrite(AvroIO.Write.named("MyWrite") - .to(avroFile.getPath()) - .withSchema(schema), - "MyWrite"); - runTestWrite(AvroIO.Write.named("MyWrite") - .withSchema(schema) - .to(avroFile.getPath()), - "MyWrite"); - runTestWrite(AvroIO.Write.to(avroFile.getPath()) - .withSchema(schema) - .named("HerWrite"), - "HerWrite"); - runTestWrite(AvroIO.Write.to(avroFile.getPath()) - .named("HerWrite") - .withSchema(schema), - "HerWrite"); - runTestWrite(AvroIO.Write.withSchema(schema) - .named("HerWrite") - .to(avroFile.getPath()), - "HerWrite"); - runTestWrite(AvroIO.Write.withSchema(schema) - .to(avroFile.getPath()) - .named("HerWrite"), - "HerWrite"); + runTestWrite( + AvroIO.Write.to(avroFile.getPath()).withSchema(schema), + "AvroIO.Write"); + runTestWrite( + AvroIO.Write.withSchema(schema).to(avroFile.getPath()), + "AvroIO.Write"); } @Test @Category(NeedsRunner.class) public void testWriteFromSchemaString() throws Exception { - runTestWrite(AvroIO.Write.to(avroFile.getPath()) - .withSchema(schemaString), - "AvroIO.Write"); - runTestWrite(AvroIO.Write.withSchema(schemaString) - .to(avroFile.getPath()), - "AvroIO.Write"); - runTestWrite(AvroIO.Write.named("MyWrite") - .to(avroFile.getPath()) - .withSchema(schemaString), - "MyWrite"); - runTestWrite(AvroIO.Write.named("MyWrite") - .withSchema(schemaString) - .to(avroFile.getPath()), - "MyWrite"); - runTestWrite(AvroIO.Write.to(avroFile.getPath()) - .withSchema(schemaString) - .named("HerWrite"), - "HerWrite"); - runTestWrite(AvroIO.Write.to(avroFile.getPath()) - .named("HerWrite") - .withSchema(schemaString), - "HerWrite"); - runTestWrite(AvroIO.Write.withSchema(schemaString) - .named("HerWrite") - .to(avroFile.getPath()), - "HerWrite"); - runTestWrite(AvroIO.Write.withSchema(schemaString) - .to(avroFile.getPath()) - .named("HerWrite"), - "HerWrite"); + runTestWrite( + AvroIO.Write.to(avroFile.getPath()).withSchema(schemaString), + "AvroIO.Write"); + runTestWrite( + AvroIO.Write.withSchema(schemaString).to(avroFile.getPath()), + "AvroIO.Write"); } // TODO: for Write only, test withSuffix, withNumShards, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index 13c1bcf..8625b10 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; + import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -81,10 +82,6 @@ public class AvroIOTest { public void testAvroIOGetName() { assertEquals("AvroIO.Read", AvroIO.Read.from("gs://bucket/foo*/baz").getName()); assertEquals("AvroIO.Write", AvroIO.Write.to("gs://bucket/foo/baz").getName()); - assertEquals("ReadMyFile", - AvroIO.Read.named("ReadMyFile").from("gs://bucket/foo*/baz").getName()); - assertEquals("WriteMyFile", - AvroIO.Write.named("WriteMyFile").to("gs://bucket/foo/baz").getName()); } @DefaultCoder(AvroCoder.class) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java index a1daf72..f0d3fce 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java @@ -340,8 +340,7 @@ public class BigQueryIOTest implements Serializable { checkReadTableObjectWithValidate(bound, project, dataset, table, true); } - private void checkReadQueryObject( - BigQueryIO.Read.Bound bound, String query) { + private void checkReadQueryObject(BigQueryIO.Read.Bound bound, String query) { checkReadQueryObjectWithValidate(bound, query, true); } @@ -393,15 +392,13 @@ public class BigQueryIOTest implements Serializable { @Test public void testBuildTableBasedSource() { - BigQueryIO.Read.Bound bound = BigQueryIO.Read.named("ReadMyTable") - .from("foo.com:project:somedataset.sometable"); + BigQueryIO.Read.Bound bound = BigQueryIO.Read.from("foo.com:project:somedataset.sometable"); checkReadTableObject(bound, "foo.com:project", "somedataset", "sometable"); } @Test public void testBuildQueryBasedSource() { - BigQueryIO.Read.Bound bound = BigQueryIO.Read.named("ReadMyQuery") - .fromQuery("foo_query"); + BigQueryIO.Read.Bound bound = BigQueryIO.Read.fromQuery("foo_query"); checkReadQueryObject(bound, "foo_query"); } @@ -409,8 +406,8 @@ public class BigQueryIOTest implements Serializable { public void testBuildTableBasedSourceWithoutValidation() { // This test just checks that using withoutValidation will not trigger object // construction errors. - BigQueryIO.Read.Bound bound = BigQueryIO.Read.named("ReadMyTable") - .from("foo.com:project:somedataset.sometable").withoutValidation(); + BigQueryIO.Read.Bound bound = + BigQueryIO.Read.from("foo.com:project:somedataset.sometable").withoutValidation(); checkReadTableObjectWithValidate(bound, "foo.com:project", "somedataset", "sometable", false); } @@ -418,15 +415,15 @@ public class BigQueryIOTest implements Serializable { public void testBuildQueryBasedSourceWithoutValidation() { // This test just checks that using withoutValidation will not trigger object // construction errors. - BigQueryIO.Read.Bound bound = BigQueryIO.Read.named("ReadMyTable") - .fromQuery("some_query").withoutValidation(); + BigQueryIO.Read.Bound bound = + BigQueryIO.Read.fromQuery("some_query").withoutValidation(); checkReadQueryObjectWithValidate(bound, "some_query", false); } @Test public void testBuildTableBasedSourceWithDefaultProject() { - BigQueryIO.Read.Bound bound = BigQueryIO.Read.named("ReadMyTable") - .from("somedataset.sometable"); + BigQueryIO.Read.Bound bound = + BigQueryIO.Read.from("somedataset.sometable"); checkReadTableObject(bound, null, "somedataset", "sometable"); } @@ -436,8 +433,7 @@ public class BigQueryIOTest implements Serializable { .setProjectId("foo.com:project") .setDatasetId("somedataset") .setTableId("sometable"); - BigQueryIO.Read.Bound bound = BigQueryIO.Read.named("ReadMyTable") - .from(table); + BigQueryIO.Read.Bound bound = BigQueryIO.Read.from(table); checkReadTableObject(bound, "foo.com:project", "somedataset", "sometable"); } @@ -457,18 +453,7 @@ public class BigQueryIOTest implements Serializable { thrown.expectMessage( Matchers.either(Matchers.containsString("Unable to confirm BigQuery dataset presence")) .or(Matchers.containsString("BigQuery dataset not found for table"))); - p.apply(BigQueryIO.Read.named("ReadMyTable").from(tableRef)); - } - - @Test - @Category(RunnableOnService.class) - public void testBuildSourceWithoutTableOrQuery() { - Pipeline p = TestPipeline.create(); - thrown.expect(IllegalStateException.class); - thrown.expectMessage( - "Invalid BigQuery read operation, either table reference or query has to be set"); - p.apply(BigQueryIO.Read.named("ReadMyTable")); - p.run(); + p.apply(BigQueryIO.Read.from(tableRef)); } @Test @@ -490,8 +475,8 @@ public class BigQueryIOTest implements Serializable { thrown.expectMessage( "Invalid BigQuery read operation. Specifies both a query and a table, only one of these" + " should be provided"); - p.apply( - BigQueryIO.Read.named("ReadMyTable") + p.apply("ReadMyTable", + BigQueryIO.Read .from("foo.com:project:somedataset.sometable") .fromQuery("query")); p.run(); @@ -505,8 +490,8 @@ public class BigQueryIOTest implements Serializable { thrown.expectMessage( "Invalid BigQuery read operation. Specifies a" + " table with a result flattening preference, which is not configurable"); - p.apply( - BigQueryIO.Read.named("ReadMyTable") + p.apply("ReadMyTable", + BigQueryIO.Read .from("foo.com:project:somedataset.sometable") .withoutResultFlattening()); p.run(); @@ -521,7 +506,7 @@ public class BigQueryIOTest implements Serializable { "Invalid BigQuery read operation. Specifies a" + " table with a result flattening preference, which is not configurable"); p.apply( - BigQueryIO.Read.named("ReadMyTable") + BigQueryIO.Read .from("foo.com:project:somedataset.sometable") .withoutValidation() .withoutResultFlattening()); @@ -644,8 +629,7 @@ public class BigQueryIOTest implements Serializable { @Test public void testBuildSink() { - BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable") - .to("foo.com:project:somedataset.sometable"); + BigQueryIO.Write.Bound bound = BigQueryIO.Write.to("foo.com:project:somedataset.sometable"); checkWriteObject( bound, "foo.com:project", "somedataset", "sometable", null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY); @@ -655,8 +639,8 @@ public class BigQueryIOTest implements Serializable { public void testBuildSinkwithoutValidation() { // This test just checks that using withoutValidation will not trigger object // construction errors. - BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable") - .to("foo.com:project:somedataset.sometable").withoutValidation(); + BigQueryIO.Write.Bound bound = + BigQueryIO.Write.to("foo.com:project:somedataset.sometable").withoutValidation(); checkWriteObjectWithValidate( bound, "foo.com:project", "somedataset", "sometable", null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, false); @@ -664,8 +648,7 @@ public class BigQueryIOTest implements Serializable { @Test public void testBuildSinkDefaultProject() { - BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable") - .to("somedataset.sometable"); + BigQueryIO.Write.Bound bound = BigQueryIO.Write.to("somedataset.sometable"); checkWriteObject( bound, null, "somedataset", "sometable", null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY); @@ -677,8 +660,7 @@ public class BigQueryIOTest implements Serializable { .setProjectId("foo.com:project") .setDatasetId("somedataset") .setTableId("sometable"); - BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable") - .to(table); + BigQueryIO.Write.Bound bound = BigQueryIO.Write.to(table); checkWriteObject( bound, "foo.com:project", "somedataset", "sometable", null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY); @@ -691,14 +673,14 @@ public class BigQueryIOTest implements Serializable { thrown.expect(IllegalStateException.class); thrown.expectMessage("must set the table reference"); p.apply(Create.<TableRow>of().withCoder(TableRowJsonCoder.of())) - .apply(BigQueryIO.Write.named("WriteMyTable")); + .apply(BigQueryIO.Write.withoutValidation()); } @Test public void testBuildSinkWithSchema() { TableSchema schema = new TableSchema(); - BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable") - .to("foo.com:project:somedataset.sometable").withSchema(schema); + BigQueryIO.Write.Bound bound = + BigQueryIO.Write.to("foo.com:project:somedataset.sometable").withSchema(schema); checkWriteObject( bound, "foo.com:project", "somedataset", "sometable", schema, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY); @@ -706,7 +688,7 @@ public class BigQueryIOTest implements Serializable { @Test public void testBuildSinkWithCreateDispositionNever() { - BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable") + BigQueryIO.Write.Bound bound = BigQueryIO.Write .to("foo.com:project:somedataset.sometable") .withCreateDisposition(CreateDisposition.CREATE_NEVER); checkWriteObject( @@ -716,7 +698,7 @@ public class BigQueryIOTest implements Serializable { @Test public void testBuildSinkWithCreateDispositionIfNeeded() { - BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable") + BigQueryIO.Write.Bound bound = BigQueryIO.Write .to("foo.com:project:somedataset.sometable") .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED); checkWriteObject( @@ -726,7 +708,7 @@ public class BigQueryIOTest implements Serializable { @Test public void testBuildSinkWithWriteDispositionTruncate() { - BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable") + BigQueryIO.Write.Bound bound = BigQueryIO.Write .to("foo.com:project:somedataset.sometable") .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE); checkWriteObject( @@ -736,7 +718,7 @@ public class BigQueryIOTest implements Serializable { @Test public void testBuildSinkWithWriteDispositionAppend() { - BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable") + BigQueryIO.Write.Bound bound = BigQueryIO.Write .to("foo.com:project:somedataset.sometable") .withWriteDisposition(WriteDisposition.WRITE_APPEND); checkWriteObject( @@ -746,7 +728,7 @@ public class BigQueryIOTest implements Serializable { @Test public void testBuildSinkWithWriteDispositionEmpty() { - BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable") + BigQueryIO.Write.Bound bound = BigQueryIO.Write .to("foo.com:project:somedataset.sometable") .withWriteDisposition(WriteDisposition.WRITE_EMPTY); checkWriteObject( @@ -794,7 +776,7 @@ public class BigQueryIOTest implements Serializable { Matchers.either(Matchers.containsString("Unable to confirm BigQuery dataset presence")) .or(Matchers.containsString("BigQuery dataset not found for table"))); p.apply(Create.<TableRow>of().withCoder(TableRowJsonCoder.of())) - .apply(BigQueryIO.Write.named("WriteMyTable") + .apply(BigQueryIO.Write .to(tableRef) .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) .withSchema(new TableSchema())); @@ -878,8 +860,6 @@ public class BigQueryIOTest implements Serializable { public void testBigQueryIOGetName() { assertEquals("BigQueryIO.Read", BigQueryIO.Read.from("somedataset.sometable").getName()); assertEquals("BigQueryIO.Write", BigQueryIO.Write.to("somedataset.sometable").getName()); - assertEquals("ReadMyTable", BigQueryIO.Read.named("ReadMyTable").getName()); - assertEquals("WriteMyTable", BigQueryIO.Write.named("WriteMyTable").getName()); } @Test @@ -915,7 +895,7 @@ public class BigQueryIOTest implements Serializable { thrown.expectMessage("must set the table reference of a BigQueryIO.Write transform"); TestPipeline.create() .apply(Create.<TableRow>of()) - .apply(BigQueryIO.Write.named("name")); + .apply("name", BigQueryIO.Write.withoutValidation()); } @Test http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java index b0c577d..c9f4079 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java @@ -21,6 +21,7 @@ import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionE import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionFails; import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionSucceedsAndConsistent; import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource; + import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; @@ -717,7 +718,7 @@ public class FileBasedSourceTest { File file = createFileWithData(fileName, data); TestFileBasedSource source = new TestFileBasedSource(file.getPath(), 64, null); - PCollection<String> output = p.apply(Read.from(source).named("ReadFileData")); + PCollection<String> output = p.apply("ReadFileData", Read.from(source)); PAssert.that(output).containsInAnyOrder(data); p.run(); @@ -743,7 +744,7 @@ public class FileBasedSourceTest { TestFileBasedSource source = new TestFileBasedSource(new File(file1.getParent(), "file*").getPath(), 64, null); - PCollection<String> output = p.apply(Read.from(source).named("ReadFileData")); + PCollection<String> output = p.apply("ReadFileData", Read.from(source)); List<String> expectedResults = new ArrayList<String>(); expectedResults.addAll(data1); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java index eaf452d..efa1cd2 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java @@ -45,10 +45,6 @@ public class PubsubIOTest { PubsubIO.Read.topic("projects/myproject/topics/mytopic").getName()); assertEquals("PubsubIO.Write", PubsubIO.Write.topic("projects/myproject/topics/mytopic").getName()); - assertEquals("ReadMyTopic", - PubsubIO.Read.named("ReadMyTopic").topic("projects/myproject/topics/mytopic").getName()); - assertEquals("WriteMyTopic", - PubsubIO.Write.named("WriteMyTopic").topic("projects/myproject/topics/mytopic").getName()); } @Test http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java index c3a5084..df598c8 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java @@ -22,6 +22,7 @@ import static org.apache.beam.sdk.TestUtils.LINES_ARRAY; import static org.apache.beam.sdk.TestUtils.NO_INTS_ARRAY; import static org.apache.beam.sdk.TestUtils.NO_LINES_ARRAY; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; + import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -44,14 +45,12 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.SourceTestUtils; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.GcsUtil; import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; import com.google.common.collect.ImmutableList; @@ -167,16 +166,9 @@ public class TextIOTest { } { - PCollection<String> output2 = - p.apply(TextIO.Read.named("MyRead").from(file)); + PCollection<String> output2 = p.apply("MyRead", TextIO.Read.from(file)); assertEquals("MyRead/Read.out", output2.getName()); } - - { - PCollection<String> output3 = - p.apply(TextIO.Read.from(file).named("HerRead")); - assertEquals("HerRead/Read.out", output3.getName()); - } } @Test @@ -299,27 +291,6 @@ public class TextIOTest { } @Test - public void testWriteNamed() { - { - PTransform<PCollection<String>, PDone> transform1 = - TextIO.Write.to("/tmp/file.txt"); - assertEquals("TextIO.Write", transform1.getName()); - } - - { - PTransform<PCollection<String>, PDone> transform2 = - TextIO.Write.named("MyWrite").to("/tmp/file.txt"); - assertEquals("MyWrite", transform2.getName()); - } - - { - PTransform<PCollection<String>, PDone> transform3 = - TextIO.Write.to("/tmp/file.txt").named("HerWrite"); - assertEquals("HerWrite", transform3.getName()); - } - } - - @Test @Category(NeedsRunner.class) public void testShardedWrite() throws Exception { runTestWrite(LINES_ARRAY, StringUtf8Coder.of(), 5); @@ -620,12 +591,8 @@ public class TextIOTest { public void testTextIOGetName() { assertEquals("TextIO.Read", TextIO.Read.from("somefile").getName()); assertEquals("TextIO.Write", TextIO.Write.to("somefile").getName()); - assertEquals("ReadMyFile", TextIO.Read.named("ReadMyFile").from("somefile").getName()); - assertEquals("WriteMyFile", TextIO.Write.named("WriteMyFile").to("somefile").getName()); assertEquals("TextIO.Read", TextIO.Read.from("somefile").toString()); - assertEquals( - "ReadMyFile [TextIO.Read]", TextIO.Read.named("ReadMyFile").from("somefile").toString()); } @Test http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java index eb65468..37e3881 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java @@ -21,6 +21,7 @@ import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionE import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionFails; import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionSucceedsAndConsistent; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; + import static org.hamcrest.Matchers.both; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; @@ -582,7 +583,7 @@ public class XmlSourceTest { .withRecordClass(Train.class) .withMinBundleSize(1024); - PCollection<Train> output = p.apply(Read.from(source).named("ReadFileData")); + PCollection<Train> output = p.apply("ReadFileData", Read.from(source)); List<Train> expectedResults = ImmutableList.of(new Train("Thomas", 1, "blue", null), new Train("Henry", 3, "green", null), @@ -672,7 +673,7 @@ public class XmlSourceTest { .withRecordElement("train") .withRecordClass(Train.class) .withMinBundleSize(1024); - PCollection<Train> output = p.apply(Read.from(source).named("ReadFileData")); + PCollection<Train> output = p.apply("ReadFileData", Read.from(source)); PAssert.that(output).containsInAnyOrder(trains); p.run(); @@ -814,13 +815,13 @@ public class XmlSourceTest { Pipeline p = TestPipeline.create(); - XmlSource<Train> source = XmlSource.<Train>from(file.getParent() + "/" - + "temp*.xml") - .withRootElement("trains") - .withRecordElement("train") - .withRecordClass(Train.class) - .withMinBundleSize(1024); - PCollection<Train> output = p.apply(Read.from(source).named("ReadFileData")); + XmlSource<Train> source = + XmlSource.<Train>from(file.getParent() + "/" + "temp*.xml") + .withRootElement("trains") + .withRecordElement("train") + .withRecordClass(Train.class) + .withMinBundleSize(1024); + PCollection<Train> output = p.apply("ReadFileData", Read.from(source)); List<Train> expectedResults = new ArrayList<>(); expectedResults.addAll(trains1); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java index 0c992c4..08c3996 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java @@ -120,9 +120,9 @@ public class TransformTreeTest { Pipeline p = TestPipeline.create(); - p.apply(TextIO.Read.named("ReadMyFile").from(inputFile.getPath())) + p.apply("ReadMyFile", TextIO.Read.from(inputFile.getPath())) .apply(Sample.<String>any(10)) - .apply(TextIO.Write.named("WriteMyFile").to(outputFile.getPath())); + .apply("WriteMyFile", TextIO.Write.to(outputFile.getPath())); final EnumSet<TransformsSeen> visited = EnumSet.noneOf(TransformsSeen.class); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java index c858f32..76bc038 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java @@ -119,11 +119,11 @@ public class WindowTest implements Serializable { FixedWindows fixed25 = FixedWindows.of(Duration.standardMinutes(25)); WindowingStrategy<?, ?> strategy = TestPipeline.create() .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of())) - .apply(Window.named("WindowInto10").<String>into(fixed10) + .apply("WindowInto10", Window.<String>into(fixed10) .withAllowedLateness(Duration.standardDays(1)) .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(5))) .accumulatingFiredPanes()) - .apply(Window.named("WindowInto25").<String>into(fixed25)) + .apply("WindowInto25", Window.<String>into(fixed25)) .getWindowingStrategy(); assertEquals(Duration.standardDays(1), strategy.getAllowedLateness()); @@ -272,7 +272,7 @@ public class WindowTest implements Serializable { @Test public void testDisplayDataExcludesUnspecifiedProperties() { - Window.Bound<?> onlyHasAccumulationMode = Window.named("foobar").discardingFiredPanes(); + Window.Bound<?> onlyHasAccumulationMode = Window.discardingFiredPanes(); assertThat(DisplayData.from(onlyHasAccumulationMode), not(hasDisplayItem(hasKey(isOneOf( "windowFn", "trigger", http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java index 21f58df..c1e092a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java @@ -221,7 +221,7 @@ public class WindowingTest implements Serializable { Pipeline p = TestPipeline.create(); PCollection<String> output = p.begin() - .apply(TextIO.Read.named("ReadLines").from(filename)) + .apply("ReadLines", TextIO.Read.from(filename)) .apply(ParDo.of(new ExtractWordsWithTimestampsFn())) .apply(new WindowedCount(FixedWindows.of(Duration.millis(10)))); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java index 3306cb4..c0e5b17 100644 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java @@ -174,7 +174,7 @@ public class DebuggingWordCount { Pipeline p = Pipeline.create(options); PCollection<KV<String, Long>> filteredWords = - p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile())) + p.apply("ReadLines", TextIO.Read.from(options.getInputFile())) .apply(new WordCount.CountWords()) .apply(ParDo.of(new FilterTextFn(options.getFilterPattern()))); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/57195358/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java ---------------------------------------------------------------------- diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java index 07ed6d0..803e800 100644 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java @@ -195,10 +195,10 @@ public class WordCount { // Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the // static FormatAsTextFn() to the ParDo transform. - p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile())) + p.apply("ReadLines", TextIO.Read.from(options.getInputFile())) .apply(new CountWords()) .apply(ParDo.of(new FormatAsTextFn())) - .apply(TextIO.Write.named("WriteCounts").to(options.getOutput())); + .apply("WriteCounts", TextIO.Write.to(options.getOutput())); p.run(); }
