Repository: incubator-beam-site Updated Branches: refs/heads/asf-site 4f1473477 -> e2430eb4d
Revise Beam programming guide for new DoFn Project: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/commit/303864a3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/tree/303864a3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/diff/303864a3 Branch: refs/heads/asf-site Commit: 303864a311abca93170ee1693a42a5e265e37a35 Parents: 4f14734 Author: Kenneth Knowles <k...@google.com> Authored: Mon Aug 8 10:09:43 2016 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Mon Aug 8 10:09:43 2016 -0700 ---------------------------------------------------------------------- learn/programming-guide.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam-site/blob/303864a3/learn/programming-guide.md ---------------------------------------------------------------------- diff --git a/learn/programming-guide.md b/learn/programming-guide.md index 92cf17c..ac18ba6 100644 --- a/learn/programming-guide.md +++ b/learn/programming-guide.md @@ -271,11 +271,11 @@ A `DoFn` processes one element at a time from the input `PCollection`. When you static class ComputeWordLengthFn extends DoFn<String, Integer> { ... } ``` -Inside your `DoFn` subclass, you'll need to override the method `processElement`, where you provide the actual processing logic. You don't need to manually extract the elements from the input collection; the Beam SDKs handle that for you. Your override of `processElement` should accept an object of type `ProcessContext`. The `ProcessContext` object gives you access to an input element and a method for emitting an output element: +Inside your `DoFn` subclass, you'll write a method annotated with `@ProcessElement` where you provide the actual processing logic. You don't need to manually extract the elements from the input collection; the Beam SDKs handle that for you. Your `@ProcessElement` method should accept an object of type `ProcessContext`. The `ProcessContext` object gives you access to an input element and a method for emitting an output element: ```java static class ComputeWordLengthFn extends DoFn<String, Integer> { - @Override + @ProcessElement public void processElement(ProcessContext c) { // Get the input element from ProcessContext. String word = c.element(); @@ -287,9 +287,9 @@ static class ComputeWordLengthFn extends DoFn<String, Integer> { > **Note:** If the elements in your input `PCollection` are key/value pairs, > you can access the key or value by using `ProcessContext.element().getKey()` > or `ProcessContext.element().getValue()`, respectively. -A given `DoFn` instance generally gets invoked one or more times to process some arbitrary bundle of elements. However, Beam doesn't guarantee an exact number of invocations; it may be invoked multiple times on a given worker node to account for failures and retries. As such, you can cache information across multiple calls to `processElement`, but if you do so, make sure the implementation **does not depend on the number of invocations**. +A given `DoFn` instance generally gets invoked one or more times to process some arbitrary bundle of elements. However, Beam doesn't guarantee an exact number of invocations; it may be invoked multiple times on a given worker node to account for failures and retries. As such, you can cache information across multiple calls to your `@ProcessElement` method, but if you do so, make sure the implementation **does not depend on the number of invocations**. -When you override `processElement`, you'll need to meet some immutability requirements to ensure that Beam and the processing back-end can safely serialize and cache the values in your pipeline. Your method should meet the following requirements: +In your `@ProcessElement` method, you'll also need to meet some immutability requirements to ensure that Beam and the processing back-end can safely serialize and cache the values in your pipeline. Your method should meet the following requirements: * You should not in any way modify an element returned by `ProcessContext.element()` or `ProcessContext.sideInput()` (the incoming elements from the input collection). * Once you output a value using `ProcessContext.output()` or `ProcessContext.sideOutput()`, you should not modify that value in any way. @@ -310,7 +310,7 @@ PCollection<Integer> wordLengths = words.apply( ParDo .named("ComputeWordLengths") // the transform name .of(new DoFn<String, Integer>() { // a DoFn as an anonymous inner class instance - @Override + @ProcessElement public void processElement(ProcessContext c) { c.output(c.element().length()); }