lostluck commented on a change in pull request #13160:
URL: https://github.com/apache/beam/pull/13160#discussion_r510373174



##########
File path: website/www/site/content/en/documentation/programming-guide.md
##########
@@ -5143,3 +5143,282 @@ perUser.apply(ParDo.of(new DoFn<KV<String, ValueT>, 
OutputT>() {
   }
 }));
 {{< /highlight >}}
+
+## 12 Splittable DoFns {#splittable-dofns}
+
+Splittable DoFns (SDFs) enable users to create modular components containing 
I/Os (and some advanced
+[non I/O use 
cases](https://s.apache.org/splittable-do-fn#heading=h.5cep9s8k4fxv)). Having 
modular
+I/O components that can be connected to each other simplify typical patterns 
that users want.
+For example, a popular use case is to read filenames from a message queue 
followed by parsing those
+files. Traditionally users were required to either write a single I/O 
connector that contained the
+logic for the message queue and the file reader (increased complexity) or 
choose to reuse a message
+queue I/O followed by a regular DoFn that read the file (decreased 
performance). With splittable DoFns,
+we bring the richness of Apache Beam’s I/O APIs to DoFns enabling modularity 
while maintaining the
+performance of traditional I/O connectors.
+
+### 12.1 Splittable DoFn basics {#splittable-dofn-basics}
+
+At a high level, a splittable DoFn is responsible for processing element and 
restriction pairs. A
+restriction represents a subset of work that would have been necessary to have 
been done when
+processing the element.
+
+Executing a splittable DoFn follows the following steps:
+
+1. Each element is paired with a restriction (e.g. filename is paired with 
offset range representing the whole file).
+2. Each element and restriction pair is split (e.g. offset ranges are broken 
up into smaller pieces).
+3. The runner redistributes the element and restriction pairs to several 
workers.
+4. Element and restriction pairs are processed in parallel (e.g. the file is 
read).
+
+![Diagram of steps that a splittable DoFn is composed 
of](/images/sdf_high_level_overview.svg)
+
+Within the last step, the element and restriction pair can pause its own 
processing and/or be split into
+further element and restriction pairs. This last step is what enables I/O-like 
capabilities for DoFns.
+
+
+#### 12.1.1 A basic splittable DoFn {#a-basic-splittable-dofn}
+
+A basic splittable DoFn is composed of three parts: a restriction, a 
restriction provider, and a
+restriction tracker. The restriction is used to represent a subset of work for 
a given element.
+The restriction provider lets SDF authors override default implementations for 
splitting, sizing,
+watermark estimation, and so forth. In 
[Java](https://github.com/apache/beam/blob/f4c2734261396858e388ebef2eef50e7d48231a8/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L92)
+and 
[Go](https://github.com/apache/beam/blob/0f466e6bcd4ac8677c2bd9ecc8e6af3836b7f3b8/sdks/go/pkg/beam/pardo.go#L226),
+this is the DoFn. 
[Python](https://github.com/apache/beam/blob/f4c2734261396858e388ebef2eef50e7d48231a8/sdks/python/apache_beam/transforms/core.py#L213)
+has a dedicated RestrictionProvider type. The restriction tracker is 
responsible for tracking
+what subset of the restriction has been completed during processing.
+
+To define a splittable DoFn, you must choose whether the splittable DoFn is 
bounded (default) or
+unbounded and define a way to initialize an initial restriction for an element.
+
+{{< highlight java >}}
+{{< code_sample 
"examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java" 
SDF_BasicExample >}}
+{{< /highlight >}}
+
+{{< highlight py >}}
+{{< code_sample "sdks/python/apache_beam/examples/snippets/snippets.py" 
SDF_BasicExample >}}
+{{< /highlight >}}
+
+{{< highlight go >}}
+func (fn *splittableDoFn) CreateInitialRestriction(filename string) 
offsetrange.Restriction {
+       return offsetrange.Restriction{
+               Start: 0,
+               End:   getFileLength(filename),
+       }
+}
+
+func (fn *splittableDoFn) CreateTracker(rest offsetrange.Restriction) 
*sdf.LockRTracker {
+       return sdf.NewLockRTracker(offsetrange.NewTracker(rest))
+}
+
+func (fn *splittableDoFn) ProcessElement(rt *sdf.LockRTracker, filename 
string, emit func(int)) error {
+            file, err := os.Open(filename)
+       if err != nil {
+               return err
+       }
+       offset, err := seekToNextRecordBoundaryInFile(file, 
rt.GetRestriction().(offsetrange.Restriction).Start)
+
+       if err != nil {
+               return err
+       }
+       for rt.TryClaim(offset) {
+               record, newOffset := readNextRecord(file)
+               emit(record)
+               offset = newOffset
+       }
+       return nil
+}
+{{< /highlight >}}
+
+At this point, we have a splittable DoFn that supports [runner-initiated 
splits](#runner-initiated-split)

Review comment:
       Consider pointing out that with *only* the above example code, the 
restrictions don't currently do anything extra WRT the same DoFn without the 
restriction handling. It's a non-splittable-dofn with vestigial extras.  
(there's probably a much better way to phrase this). 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to