This is an automated email from the ASF dual-hosted git repository.
git-site-role pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/asf-site by this push:
new ab8dcc0 Publishing website 2021/01/26 06:03:13 at commit dde38b6
ab8dcc0 is described below
commit ab8dcc0cceaa215064010e5e1a77be3c37ee8c7e
Author: jenkins <[email protected]>
AuthorDate: Tue Jan 26 06:03:13 2021 +0000
Publishing website 2021/01/26 06:03:13 at commit dde38b6
---
website/generated-content/documentation/index.xml | 138 ++++++++++++++++++---
.../documentation/programming-guide/index.html | 134 +++++++++++++++++---
website/generated-content/sitemap.xml | 2 +-
3 files changed, 239 insertions(+), 35 deletions(-)
diff --git a/website/generated-content/documentation/index.xml
b/website/generated-content/documentation/index.xml
index 04e9667..4faffd6 100644
--- a/website/generated-content/documentation/index.xml
+++ b/website/generated-content/documentation/index.xml
@@ -6455,6 +6455,7 @@ read and modified inside the DoFn&rsquo;s
<code>@ProcessElement</code>
registered, then Beam will automatically infer the coder for the state value.
Otherwise, a coder can be explicitly
specified when creating the ValueState. For example, the following ParDo
creates a single state variable that
accumulates the number of elements seen.</p>
+<p>Note: <code>ValueState</code> is called
<code>ReadModifyWriteState</code> in the Python SDK.</p>
<div class=language-java>
<div class="highlight"><pre class="chroma"><code
class="language-java" data-lang="java"><span
class="n">PCollection</span><span class="o">&lt;</span><span
class="n">KV</span><span class="o">&lt;</span><span
class="n">String</span><span class="o">,</span> <span
class="n">ValueT</span><span class="o">&gt;&gt;</span>
<span class="n">perUser</span> <span class="o">=</span> <span
class="n">readPerUser</ [...]
<span class="n">perUser</span><span class="o">.</span><span
class="na">apply</span><span class="o">(</span><span
class="n">ParDo</span><span class="o">.</span><span
class="na">of</span><span class="o">(</span><span
class="k">new</span> <span class="n">DoFn</span><span
class="o">&lt;</span><span class="n">KV</span><span
class="o">&lt;</span><span class="n">String</span><span
class="o">,</span [...]
@@ -6476,6 +6477,16 @@ accumulates the number of elements seen.</p>
<span class="o">...</span>
<span class="o">}));</span></code></pre></div>
</div>
+<div class=language-py>
+<div class="highlight"><pre class="chroma"><code class="language-py"
data-lang="py"><span class="k">class</span> <span
class="nc">ReadModifyWriteStateDoFn</span><span
class="p">(</span><span class="n">DoFn</span><span
class="p">):</span>
+<span class="n">STATE_SPEC</span> <span class="o">=</span>
<span class="n">ReadModifyWriteStateSpec</span><span
class="p">(</span><span
class="s1">&#39;num_elements&#39;</span><span
class="p">,</span> <span class="n">VarIntCoder</span><span
class="p">())</span>
+<span class="k">def</span> <span class="nf">process</span><span
class="p">(</span><span class="bp">self</span><span
class="p">,</span> <span class="n">element</span><span
class="p">,</span> <span class="n">state</span><span
class="o">=</span><span class="n">DoFn</span><span
class="o">.</span><span class="n">StateParam</span><span
class="p">(</span><span class="n">STATE_SPEC</span><span
class="p">)):</span>
+<span class="c1"># Read the number element seen so far for this user
key.</span>
+<span class="n">current_value</span> <span class="o">=</span>
<span class="n">state</span><span class="o">.</span><span
class="n">read</span><span class="p">()</span> <span
class="ow">or</span> <span class="mi">0</span>
+<span class="n">state</span><span class="o">.</span><span
class="n">write</span><span class="p">(</span><span
class="n">current_value</span><span class="o">+</span><span
class="mi">1</span><span class="p">)</span>
+<span class="n">_</span> <span class="o">=</span> <span
class="p">(</span><span class="n">p</span> <span
class="o">|</span> <span class="s1">&#39;Read per
user&#39;</span> <span class="o">&gt;&gt;</span> <span
class="n">ReadPerUser</span><span class="p">()</span>
+<span class="o">|</span> <span class="s1">&#39;state
pardo&#39;</span> <span class="o">&gt;&gt;</span> <span
class="n">beam</span><span class="o">.</span><span
class="n">ParDo</span><span class="p">(</span><span
class="n">ReadModifyWriteStateDoFn</span><span
class="p">()))</span></code></pre></div>
+</div>
<h4 id="combiningstate">CombiningState</h4>
<p><code>CombiningState</code> allows you to create a state object
that is updated using a Beam combiner. For example, the previous
<code>ValueState</code> example could be rewritten to use
<code>CombiningState</code></p>
@@ -6489,10 +6500,10 @@ accumulates the number of elements seen.</p>
<span class="o">}</span>
<span class="o">}));</span></code></pre></div>
</div>
-<div class=language-python>
-<div class="highlight"><pre class="chroma"><code
class="language-python" data-lang="python"><span class="k">class</span>
<span class="nc">CombiningStateDoFn</span><span
class="p">(</span><span class="n">DoFn</span><span
class="p">):</span>
+<div class=language-py>
+<div class="highlight"><pre class="chroma"><code class="language-py"
data-lang="py"><span class="k">class</span> <span
class="nc">CombiningStateDoFn</span><span class="p">(</span><span
class="n">DoFn</span><span class="p">):</span>
<span class="n">SUM_TOTAL</span> <span class="o">=</span> <span
class="n">CombiningValueStateSpec</span><span
class="p">(</span><span
class="s1">&#39;total&#39;</span><span class="p">,</span>
<span class="nb">sum</span><span class="p">)</span>
-<span class="k">def</span> <span class="nf">process</span><span
class="p">(</span><span class="bp">self</span><span
class="p">,</span> <span class="n">element</span><span
class="p">,</span> <span class="n">state</span><span
class="o">=</span><span class="n">SoFn</span><span
class="o">.</span><span class="n">StateParam</span><span
class="p">(</span><span class="n">SUM_TOTAL</span><span
class="p">)):</span>
+<span class="k">def</span> <span class="nf">process</span><span
class="p">(</span><span class="bp">self</span><span
class="p">,</span> <span class="n">element</span><span
class="p">,</span> <span class="n">state</span><span
class="o">=</span><span class="n">DoFn</span><span
class="o">.</span><span class="n">StateParam</span><span
class="p">(</span><span class="n">SUM_TOTAL</span><span
class="p">)):</span>
<span class="n">state</span><span class="o">.</span><span
class="n">add</span><span class="p">(</span><span
class="mi">1</span><span class="p">)</span>
<span class="n">_</span> <span class="o">=</span> <span
class="p">(</span><span class="n">p</span> <span
class="o">|</span> <span class="s1">&#39;Read per
user&#39;</span> <span class="o">&gt;&gt;</span> <span
class="n">ReadPerUser</span><span class="p">()</span>
<span class="o">|</span> <span class="s1">&#39;Combine state
pardo&#39;</span> <span class="o">&gt;&gt;</span> <span
class="n">beam</span><span class="o">.</span><span
class="n">ParDo</span><span class="p">(</span><span
class="n">CombiningStateDofn</span><span
class="p">()))</span></code></pre></div>
@@ -6520,8 +6531,8 @@ bags larger than available memory.</p>
<span class="o">}</span>
<span class="o">}));</span></code></pre></div>
</div>
-<div class=language-python>
-<div class="highlight"><pre class="chroma"><code
class="language-python" data-lang="python"><span class="k">class</span>
<span class="nc">BagStateDoFn</span><span
class="p">(</span><span class="n">DoFn</span><span
class="p">):</span>
+<div class=language-py>
+<div class="highlight"><pre class="chroma"><code class="language-py"
data-lang="py"><span class="k">class</span> <span
class="nc">BagStateDoFn</span><span class="p">(</span><span
class="n">DoFn</span><span class="p">):</span>
<span class="n">ALL_ELEMENTS</span> <span class="o">=</span>
<span class="n">BagStateSpec</span><span class="p">(</span><span
class="s1">&#39;buffer&#39;</span><span class="p">,</span>
<span class="n">coders</span><span class="o">.</span><span
class="n">VarIntCoder</span><span class="p">())</span>
<span class="k">def</span> <span class="nf">process</span><span
class="p">(</span><span class="bp">self</span><span
class="p">,</span> <span class="n">element_pair</span><span
class="p">,</span> <span class="n">state</span><span
class="o">=</span><span class="n">DoFn</span><span
class="o">.</span><span class="n">StateParam</span><span
class="p">(</span><span class="n">ALL_ELEMENTS</span><span
class="p" [...]
<span class="n">state</span><span class="o">.</span><span
class="n">add</span><span class="p">(</span><span
class="n">element_pair</span><span class="p">[</span><span
class="mi">1</span><span class="p">])</span>
@@ -6553,6 +6564,9 @@ runner can prefetch all of the states necessary. For
example:</p>
<span class="o">}</span>
<span class="o">}));</span></code></pre></div>
</div>
+<div class=language-py>
+<div class="highlight"><pre class="chroma"><code class="language-py"
data-lang="py"><span class="n">This</span> <span
class="ow">is</span> <span class="ow">not</span> <span
class="n">supported</span> <span class="n">yet</span><span
class="p">,</span> <span class="n">see</span> <span
class="n">BEAM</span><span class="o">-</span><span
class="mf">11506.</span></code></pre></div>
+</div>
<p>If however there are code paths in which the states are not fetched,
then annotating with @AlwaysFetched will add
unnecessary fetching for those paths. In this case, the readLater method
allows the runner to know that the state will
be read in the future, allowing multiple state reads to be batched
together.</p>
@@ -6606,8 +6620,8 @@ allows for event-time aggregations.</p>
</span><span class="c1"></span> <span class="o">}</span>
<span class="o">}));</span></code></pre></div>
</div>
-<div class=language-python>
-<div class="highlight"><pre class="chroma"><code
class="language-python" data-lang="python"><span class="k">class</span>
<span class="nc">EventTimerDoFn</span><span
class="p">(</span><span class="n">DoFn</span><span
class="p">):</span>
+<div class=language-py>
+<div class="highlight"><pre class="chroma"><code class="language-py"
data-lang="py"><span class="k">class</span> <span
class="nc">EventTimerDoFn</span><span class="p">(</span><span
class="n">DoFn</span><span class="p">):</span>
<span class="n">ALL_ELEMENTS</span> <span class="o">=</span>
<span class="n">BagStateSpec</span><span class="p">(</span><span
class="s1">&#39;buffer&#39;</span><span class="p">,</span>
<span class="n">coders</span><span class="o">.</span><span
class="n">VarIntCoder</span><span class="p">())</span>
<span class="n">TIMER</span> <span class="o">=</span> <span
class="n">TimerSpec</span><span class="p">(</span><span
class="s1">&#39;timer&#39;</span><span class="p">,</span>
<span class="n">TimeDomain</span><span class="o">.</span><span
class="n">WATERMARK</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">process</span><span
class="p">(</span><span class="bp">self</span><span
class="p">,</span>
@@ -6644,8 +6658,8 @@ to the current time. In Java, the
<code>Timer.offset</code> and <code>T
</span><span class="c1"></span> <span class="o">}</span>
<span class="o">}));</span></code></pre></div>
</div>
-<div class=language-python>
-<div class="highlight"><pre class="chroma"><code
class="language-python" data-lang="python"><span class="k">class</span>
<span class="nc">ProcessingTimerDoFn</span><span
class="p">(</span><span class="n">DoFn</span><span
class="p">):</span>
+<div class=language-py>
+<div class="highlight"><pre class="chroma"><code class="language-py"
data-lang="py"><span class="k">class</span> <span
class="nc">ProcessingTimerDoFn</span><span class="p">(</span><span
class="n">DoFn</span><span class="p">):</span>
<span class="n">ALL_ELEMENTS</span> <span class="o">=</span>
<span class="n">BagStateSpec</span><span class="p">(</span><span
class="s1">&#39;buffer&#39;</span><span class="p">,</span>
<span class="n">coders</span><span class="o">.</span><span
class="n">VarIntCoder</span><span class="p">())</span>
<span class="n">TIMER</span> <span class="o">=</span> <span
class="n">TimerSpec</span><span class="p">(</span><span
class="s1">&#39;timer&#39;</span><span class="p">,</span>
<span class="n">TimeDomain</span><span class="o">.</span><span
class="n">REAL_TIME</span><span class="p">)</span>
<span class="k">def</span> <span class="nf">process</span><span
class="p">(</span><span class="bp">self</span><span
class="p">,</span>
@@ -6663,11 +6677,13 @@ to the current time. In Java, the
<code>Timer.offset</code> and <code>T
<span class="o">|</span> <span class="s1">&#39;ProcessingTime
timer pardo&#39;</span> <span class="o">&gt;&gt;</span>
<span class="n">beam</span><span class="o">.</span><span
class="n">ParDo</span><span class="p">(</span><span
class="n">ProcessingTimerDoFn</span><span
class="p">()))</span></code></pre></div>
</div>
<h4 id="dynamic-timer-tags">11.3.3. Dynamic timer tags</h4>
-<p>Beam also supports dynamically setting a timer tag using
<code>TimerMap</code>. This allows for setting multiple different timers
+<p>Beam also supports dynamically setting a timer tag using
<code>TimerMap</code> in the Java SDK. This allows for setting multiple
different timers
in a <code>DoFn</code> and allowing for the timer tags to be dynamically
chosen - e.g. based on data in the input elements. A
timer with a specific tag can only be set to a single timestamp, so setting
the timer again has the effect of
overwriting the previous expiration time for the timer with that tag. Each
<code>TimerMap</code> is identified with a timer family
id, and timers in different timer families are independent.</p>
+<p>In the Python SDK, a dynamic timer tag can be specified while calling
<code>set()</code> or <code>clear()</code>. By default, the timer
+tag is an empty string if not specified.</p>
<div class=language-java>
<div class="highlight"><pre class="chroma"><code
class="language-java" data-lang="java"><span
class="n">PCollection</span><span class="o">&lt;</span><span
class="n">KV</span><span class="o">&lt;</span><span
class="n">String</span><span class="o">,</span> <span
class="n">ValueT</span><span class="o">&gt;&gt;</span>
<span class="n">perUser</span> <span class="o">=</span> <span
class="n">readPerUser</ [...]
<span class="n">perUser</span><span class="o">.</span><span
class="na">apply</span><span class="o">(</span><span
class="n">ParDo</span><span class="o">.</span><span
class="na">of</span><span class="o">(</span><span
class="k">new</span> <span class="n">DoFn</span><span
class="o">&lt;</span><span class="n">KV</span><span
class="o">&lt;</span><span class="n">String</span><span
class="o">,</span [...]
@@ -6684,8 +6700,28 @@ id, and timers in different timer families are
independent.</p>
<span class="o">}</span>
<span class="o">}));</span></code></pre></div>
</div>
-<div class=language-python>
-<div class="highlight"><pre class="chroma"><code
class="language-python" data-lang="python"><span class="n">To</span>
<span class="n">be</span> <span class="n">supported</span><span
class="p">,</span> <span class="n">See</span> <span
class="n">BEAM</span><span class="o">-</span><span
class="mi">9602</span></code></pre></div>
+<div class=language-py>
+<div class="highlight"><pre class="chroma"><code class="language-py"
data-lang="py"><span class="k">class</span> <span
class="nc">TimerDoFn</span><span class="p">(</span><span
class="n">DoFn</span><span class="p">):</span>
+<span class="n">ALL_ELEMENTS</span> <span class="o">=</span>
<span class="n">BagStateSpec</span><span class="p">(</span><span
class="s1">&#39;buffer&#39;</span><span class="p">,</span>
<span class="n">coders</span><span class="o">.</span><span
class="n">VarIntCoder</span><span class="p">())</span>
+<span class="n">TIMER</span> <span class="o">=</span> <span
class="n">TimerSpec</span><span class="p">(</span><span
class="s1">&#39;timer&#39;</span><span class="p">,</span>
<span class="n">TimeDomain</span><span class="o">.</span><span
class="n">REAL_TIME</span><span class="p">)</span>
+<span class="k">def</span> <span class="nf">process</span><span
class="p">(</span><span class="bp">self</span><span
class="p">,</span>
+<span class="n">element_pair</span><span class="p">,</span>
+<span class="nb">buffer</span> <span class="o">=</span> <span
class="n">DoFn</span><span class="o">.</span><span
class="n">StateParam</span><span class="p">(</span><span
class="n">ALL_ELEMENTS</span><span class="p">),</span>
+<span class="n">timer</span> <span class="o">=</span> <span
class="n">DoFn</span><span class="o">.</span><span
class="n">TimerParam</span><span class="p">(</span><span
class="n">TIMER</span><span class="p">)):</span>
+<span class="nb">buffer</span><span class="o">.</span><span
class="n">add</span><span class="p">(</span><span
class="n">element_pair</span><span class="p">[</span><span
class="mi">1</span><span class="p">])</span>
+<span class="c1"># Set a timer to go off 30 seconds in the future with
dynamic timer tag &#39;first_timer&#39;.</span>
+<span class="c1"># And set a timer to go off 60 seconds in the future with
dynamic timer tag &#39;second_timer&#39;.</span>
+<span class="n">timer</span><span class="o">.</span><span
class="n">set</span><span class="p">(</span><span
class="n">Timestamp</span><span class="o">.</span><span
class="n">now</span><span class="p">()</span> <span
class="o">+</span> <span class="n">Duration</span><span
class="p">(</span><span class="n">seconds</span><span
class="o">=</span><span class="mi">30</span><span
class="p">),</span> <sp [...]
+<span class="n">timer</span><span class="o">.</span><span
class="n">set</span><span class="p">(</span><span
class="n">Timestamp</span><span class="o">.</span><span
class="n">now</span><span class="p">()</span> <span
class="o">+</span> <span class="n">Duration</span><span
class="p">(</span><span class="n">seconds</span><span
class="o">=</span><span class="mi">60</span><span
class="p">),</span> <sp [...]
+<span class="c1"># Note that a timer can also be explicitly cleared if
previously set with a dynamic timer tag:</span>
+<span class="c1"># timer.clear(dynamic_timer_tag=...)</span>
+<span class="nd">@on_timer</span><span class="p">(</span><span
class="n">TIMER</span><span class="p">)</span>
+<span class="k">def</span> <span
class="nf">expiry_callback</span><span class="p">(</span><span
class="bp">self</span><span class="p">,</span> <span
class="nb">buffer</span> <span class="o">=</span> <span
class="n">DoFn</span><span class="o">.</span><span
class="n">StateParam</span><span class="p">(</span><span
class="n">ALL_ELEMENTS</span><span class="p">),</span> <span
class="n">timer_tag</span><span [...]
+<span class="c1"># Process timer, the dynamic timer tag associated with
expiring timer can be read back with DoFn.DynamicTimerTagParam.</span>
+<span class="nb">buffer</span><span class="o">.</span><span
class="n">clear</span><span class="p">()</span>
+<span class="k">yield</span> <span class="p">(</span><span
class="n">timer_tag</span><span class="p">,</span> <span
class="s1">&#39;fired&#39;</span><span class="p">)</span>
+<span class="n">_</span> <span class="o">=</span> <span
class="p">(</span><span class="n">p</span> <span
class="o">|</span> <span class="s1">&#39;Read per
user&#39;</span> <span class="o">&gt;&gt;</span> <span
class="n">ReadPerUser</span><span class="p">()</span>
+<span class="o">|</span> <span class="s1">&#39;ProcessingTime
timer pardo&#39;</span> <span class="o">&gt;&gt;</span>
<span class="n">beam</span><span class="o">.</span><span
class="n">ParDo</span><span class="p">(</span><span
class="n">TimerDoFn</span><span
class="p">()))</span></code></pre></div>
</div>
<h4 id="timer-output-timestamps">11.3.4. Timer output timestamps</h4>
<p>By default, event-time timers will hold the output watermark of the
<code>ParDo</code> to the timestamp of the timer. This means
@@ -6778,6 +6814,9 @@ past the timestamp of the minimum element. The following
code demonstrates this.
<span class="o">}</span>
<span class="o">}));</span></code></pre></div>
</div>
+<div class=language-py>
+<div class="highlight"><pre class="chroma"><code class="language-py"
data-lang="py"><span class="n">Timer</span> <span
class="n">output</span> <span class="n">timestamps</span> <span
class="ow">is</span> <span class="ow">not</span> <span
class="n">yet</span> <span class="n">supported</span> <span
class="ow">in</span> <span class="n">Python</span> <span
class="n">SDK</span><span class="o">.</span> <span class="n [...]
+</div>
<h3 id="garbage-collecting-state">11.4. Garbage collecting state</h3>
<p>Per-key state needs to be garbage collected, or eventually the
increasing size of state may negatively impact
performance. There are two common strategies for garbage collecting
state.</p>
@@ -6802,8 +6841,8 @@ garbage-collection strategy.</p>
</span><span class="c1"></span> <span class="o">}</span>
<span class="o">}));</span></code></pre></div>
</div>
-<div class=language-python>
-<div class="highlight"><pre class="chroma"><code
class="language-python" data-lang="python"><span class="k">class</span>
<span class="nc">StateDoFn</span><span class="p">(</span><span
class="n">DoFn</span><span class="p">):</span>
+<div class=language-py>
+<div class="highlight"><pre class="chroma"><code class="language-py"
data-lang="py"><span class="k">class</span> <span
class="nc">StateDoFn</span><span class="p">(</span><span
class="n">DoFn</span><span class="p">):</span>
<span class="n">ALL_ELEMENTS</span> <span class="o">=</span>
<span class="n">BagStateSpec</span><span class="p">(</span><span
class="s1">&#39;buffer&#39;</span><span class="p">,</span>
<span class="n">coders</span><span class="o">.</span><span
class="n">VarIntCoder</span><span class="p">())</span>
<span class="k">def</span> <span class="nf">process</span><span
class="p">(</span><span class="bp">self</span><span
class="p">,</span>
<span class="n">element_pair</span><span class="p">,</span>
@@ -6851,8 +6890,8 @@ This can be done by updating a timer that garbage
collects state. For example<
<span class="o">}</span>
<span class="o">}</span></code></pre></div>
</div>
-<div class=language-python>
-<div class="highlight"><pre class="chroma"><code
class="language-python" data-lang="python"><span class="k">class</span>
<span class="nc">UserDoFn</span><span class="p">(</span><span
class="n">DoFn</span><span class="p">):</span>
+<div class=language-py>
+<div class="highlight"><pre class="chroma"><code class="language-py"
data-lang="py"><span class="k">class</span> <span
class="nc">UserDoFn</span><span class="p">(</span><span
class="n">DoFn</span><span class="p">):</span>
<span class="n">ALL_ELEMENTS</span> <span class="o">=</span>
<span class="n">BagStateSpec</span><span class="p">(</span><span
class="s1">&#39;state&#39;</span><span class="p">,</span>
<span class="n">coders</span><span class="o">.</span><span
class="n">VarIntCoder</span><span class="p">())</span>
<span class="n">MAX_TIMESTAMP</span> <span class="o">=</span>
<span class="n">CombiningValueStateSpec</span><span
class="p">(</span><span
class="s1">&#39;max_timestamp_seen&#39;</span><span
class="p">,</span> <span class="nb">max</span><span
class="p">)</span>
<span class="n">TIMER</span> <span class="o">=</span> <span
class="n">TimerSpec</span><span class="p">(</span><span
class="s1">&#39;gc-timer&#39;</span><span class="p">,</span>
<span class="n">TimeDomain</span><span class="o">.</span><span
class="n">WATERMARK</span><span class="p">)</span>
@@ -6896,7 +6935,7 @@ event before the view event. The one hour join timeout
should be based on event
</span><span class="c1"></span><span
class="n">PCollection</span><span class="o">&lt;</span><span
class="n">KV</span><span class="o">&lt;</span><span
class="n">String</span><span class="o">,</span> <span
class="n">Event</span><span class="o">&gt;&gt;</span> <span
class="n">eventsPerLinkId</span> <span class="o">=</span>
<span class="n">readEvents</span><span class="o">()</span>
<span class="o">.</span><span class="na">apply</span><span
class="o">(</span><span class="n">WithKeys</span><span
class="o">.</span><span class="na">of</span><span
class="o">(</span><span class="n">Event</span><span
class="o">::</span><span class="n">getLinkId</span><span
class="o">).</span><span class="na">withKeyType</span><span
class="o">(</span><span class="n">TypeDescriptors</span><span
class="o"> [...]
-<span class="n">perUser</span><span class="o">.</span><span
class="na">apply</span><span class="o">(</span><span
class="n">ParDo</span><span class="o">.</span><span
class="na">of</span><span class="o">(</span><span
class="k">new</span> <span class="n">DoFn</span><span
class="o">&lt;</span><span class="n">KV</span><span
class="o">&lt;</span><span class="n">String</span><span
class="o">,</span [...]
+<span class="n">eventsPerLinkId</span><span
class="o">.</span><span class="na">apply</span><span
class="o">(</span><span class="n">ParDo</span><span
class="o">.</span><span class="na">of</span><span
class="o">(</span><span class="k">new</span> <span
class="n">DoFn</span><span class="o">&lt;</span><span
class="n">KV</span><span class="o">&lt;</span><span
class="n">String</span><span class="o">,& [...]
<span class="c1">// Store the view event.
</span><span class="c1"></span> <span
class="nd">@StateId</span><span class="o">(</span><span
class="s">&#34;view&#34;</span><span class="o">)</span>
<span class="kd">private</span> <span class="kd">final</span>
<span class="n">StateSpec</span><span
class="o">&lt;</span><span class="n">ValueState</span><span
class="o">&lt;</span><span class="n">Event</span><span
class="o">&gt;&gt;</sp [...]
<span class="c1">// Store the click event.
@@ -6951,6 +6990,50 @@ event before the view event. The one hour join timeout
should be based on event
<span class="o">}</span>
<span class="o">}));</span></code></pre></div>
</div>
+<div class=language-py>
+<div class="highlight"><pre class="chroma"><code class="language-py"
data-lang="py"><span class="k">class</span> <span
class="nc">JoinDoFn</span><span class="p">(</span><span
class="n">DoFn</span><span class="p">):</span>
+<span class="c1"># stores the view event.</span>
+<span class="n">VIEW_STATE_SPEC</span> <span class="o">=</span>
<span class="n">ReadModifyWriteStateSpec</span><span
class="p">(</span><span
class="s1">&#39;view&#39;</span><span class="p">,</span>
<span class="n">EventCoder</span><span class="p">())</span>
+<span class="c1"># stores the click event.</span>
+<span class="n">CLICK_STATE_SPEC</span> <span class="o">=</span>
<span class="n">ReadModifyWriteStateSpec</span><span
class="p">(</span><span
class="s1">&#39;click&#39;</span><span class="p">,</span>
<span class="n">EventCoder</span><span class="p">())</span>
+<span class="c1"># The maximum element timestamp value seen so
far.</span>
+<span class="n">MAX_TIMESTAMP</span> <span class="o">=</span>
<span class="n">CombiningValueStateSpec</span><span
class="p">(</span><span
class="s1">&#39;max_timestamp_seen&#39;</span><span
class="p">,</span> <span class="nb">max</span><span
class="p">)</span>
+<span class="c1"># Timer that fires when an hour goes by with an incomplete
join.</span>
+<span class="n">GC_TIMER</span> <span class="o">=</span> <span
class="n">TimerSpec</span><span class="p">(</span><span
class="s1">&#39;gc&#39;</span><span class="p">,</span>
<span class="n">TimeDomain</span><span class="o">.</span><span
class="n">WATERMARK</span><span class="p">)</span>
+<span class="k">def</span> <span class="nf">process</span><span
class="p">(</span><span class="bp">self</span><span
class="p">,</span>
+<span class="n">element</span><span class="p">,</span>
+<span class="n">view</span><span class="o">=</span><span
class="n">DoFn</span><span class="o">.</span><span
class="n">StateParam</span><span class="p">(</span><span
class="n">VIEW_STATE_SPEC</span><span class="p">),</span>
+<span class="n">click</span><span class="o">=</span><span
class="n">DoFn</span><span class="o">.</span><span
class="n">StateParam</span><span class="p">(</span><span
class="n">CLICK_STATE_SPEC</span><span class="p">),</span>
+<span class="n">max_timestamp_seen</span><span
class="o">=</span><span class="n">DoFn</span><span
class="o">.</span><span class="n">StateParam</span><span
class="p">(</span><span class="n">MAX_TIMESTAMP</span><span
class="p">),</span>
+<span class="n">ts</span><span class="o">=</span><span
class="n">DoFn</span><span class="o">.</span><span
class="n">TimestampParam</span><span class="p">,</span>
+<span class="n">gc</span><span class="o">=</span><span
class="n">DoFn</span><span class="o">.</span><span
class="n">TimerParam</span><span class="p">(</span><span
class="n">GC_TIMER</span><span class="p">)):</span>
+<span class="n">event</span> <span class="o">=</span> <span
class="n">element</span>
+<span class="k">if</span> <span class="n">event</span><span
class="o">.</span><span class="n">type</span> <span
class="o">==</span> <span
class="s1">&#39;view&#39;</span><span class="p">:</span>
+<span class="n">view</span><span class="o">.</span><span
class="n">write</span><span class="p">(</span><span
class="n">event</span><span class="p">)</span>
+<span class="k">else</span><span class="p">:</span>
+<span class="n">click</span><span class="o">.</span><span
class="n">write</span><span class="p">(</span><span
class="n">event</span><span class="p">)</span>
+<span class="n">previous_view</span> <span class="o">=</span>
<span class="n">view</span><span class="o">.</span><span
class="n">read</span><span class="p">()</span>
+<span class="n">previous_click</span> <span class="o">=</span>
<span class="n">click</span><span class="o">.</span><span
class="n">read</span><span class="p">()</span>
+<span class="c1"># We&#39;ve seen both a view and a click. Output a
joined event and clear state.</span>
+<span class="k">if</span> <span class="n">previous_view</span>
<span class="ow">and</span> <span
class="n">previous_click</span><span class="p">:</span>
+<span class="k">yield</span> <span class="p">(</span><span
class="n">previous_view</span><span class="p">,</span> <span
class="n">previous_click</span><span class="p">)</span>
+<span class="n">view</span><span class="o">.</span><span
class="n">clear</span><span class="p">()</span>
+<span class="n">click</span><span class="o">.</span><span
class="n">clear</span><span class="p">()</span>
+<span class="n">max_timestamp_seen</span><span
class="o">.</span><span class="n">clear</span><span
class="p">()</span>
+<span class="k">else</span><span class="p">:</span>
+<span class="n">max_timestamp_seen</span><span
class="o">.</span><span class="n">add</span><span
class="p">(</span><span class="n">ts</span><span
class="p">)</span>
+<span class="n">gc</span><span class="o">.</span><span
class="n">set</span><span class="p">(</span><span
class="n">max_timestamp_seen</span><span class="o">.</span><span
class="n">read</span><span class="p">()</span> <span
class="o">+</span> <span class="n">Duration</span><span
class="p">(</span><span class="n">seconds</span><span
class="o">=</span><span class="mi">3600</span><span
class="p">))</span>
+<span class="nd">@on_timer</span><span class="p">(</span><span
class="n">GC_TIMER</span><span class="p">)</span>
+<span class="k">def</span> <span
class="nf">gc_callback</span><span class="p">(</span><span
class="bp">self</span><span class="p">,</span>
+<span class="n">view</span><span class="o">=</span><span
class="n">DoFn</span><span class="o">.</span><span
class="n">StateParam</span><span class="p">(</span><span
class="n">VIEW_STATE_SPEC</span><span class="p">),</span>
+<span class="n">click</span><span class="o">=</span><span
class="n">DoFn</span><span class="o">.</span><span
class="n">StateParam</span><span class="p">(</span><span
class="n">CLICK_STATE_SPEC</span><span class="p">),</span>
+<span class="n">max_timestamp_seen</span><span
class="o">=</span><span class="n">DoFn</span><span
class="o">.</span><span class="n">StateParam</span><span
class="p">(</span><span class="n">MAX_TIMESTAMP</span><span
class="p">)):</span>
+<span class="n">view</span><span class="o">.</span><span
class="n">clear</span><span class="p">()</span>
+<span class="n">click</span><span class="o">.</span><span
class="n">clear</span><span class="p">()</span>
+<span class="n">max_timestamp_seen</span><span
class="o">.</span><span class="n">clear</span><span
class="p">()</span>
+<span class="n">_</span> <span class="o">=</span> <span
class="p">(</span><span class="n">p</span> <span
class="o">|</span> <span
class="s1">&#39;EventsPerLinkId&#39;</span> <span
class="o">&gt;&gt;</span> <span
class="n">ReadPerLinkEvents</span><span class="p">()</span>
+<span class="o">|</span> <span class="s1">&#39;Join
DoFn&#39;</span> <span class="o">&gt;&gt;</span> <span
class="n">beam</span><span class="o">.</span><span
class="n">ParDo</span><span class="p">(</span><span
class="n">JoinDoFn</span><span
class="p">()))</span></code></pre></div>
+</div>
<h4 id="batching-rpcs">11.5.2. Batching RPCs</h4>
<p>In this example, input elements are being forwarded to an external RPC
service. The RPC accepts batch requests -
multiple events for the same user can be batched in a single RPC call. Since
this RPC service also imposes rate limits,
@@ -6987,6 +7070,27 @@ we want to batch ten seconds worth of events together in
order to reduce the num
<span class="o">}</span>
<span class="o">}));</span></code></pre></div>
</div>
+<div class=language-py>
+<div class="highlight"><pre class="chroma"><code class="language-py"
data-lang="py"><span class="k">class</span> <span
class="nc">BufferDoFn</span><span class="p">(</span><span
class="n">DoFn</span><span class="p">):</span>
+<span class="n">BUFFER</span> <span class="o">=</span> <span
class="n">BagStateSpec</span><span class="p">(</span><span
class="s1">&#39;buffer&#39;</span><span class="p">,</span>
<span class="n">EventCoder</span><span class="p">())</span>
+<span class="n">IS_TIMER_SET</span> <span class="o">=</span>
<span class="n">ReadModifyWriteStateSpec</span><span
class="p">(</span><span
class="s1">&#39;is_timer_set&#39;</span><span
class="p">,</span> <span class="n">BooleanCoder</span><span
class="p">())</span>
+<span class="n">OUTPUT</span> <span class="o">=</span> <span
class="n">TimerSpec</span><span class="p">(</span><span
class="s1">&#39;output&#39;</span><span class="p">,</span>
<span class="n">TimeDomain</span><span class="o">.</span><span
class="n">REAL_TIME</span><span class="p">)</span>
+<span class="k">def</span> <span class="nf">process</span><span
class="p">(</span><span class="bp">self</span><span
class="p">,</span>
+<span class="nb">buffer</span><span class="o">=</span><span
class="n">DoFn</span><span class="o">.</span><span
class="n">StateParam</span><span class="p">(</span><span
class="n">BUFFER</span><span class="p">),</span>
+<span class="n">is_timer_set</span><span
class="o">=</span><span class="n">DoFn</span><span
class="o">.</span><span class="n">StateParam</span><span
class="p">(</span><span class="n">IS_TIMER_SET</span><span
class="p">),</span>
+<span class="n">timer</span><span class="o">=</span><span
class="n">DoFn</span><span class="o">.</span><span
class="n">TimerParam</span><span class="p">(</span><span
class="n">OUTPUT</span><span class="p">)):</span>
+<span class="nb">buffer</span><span class="o">.</span><span
class="n">add</span><span class="p">(</span><span
class="n">element</span><span class="p">)</span>
+<span class="k">if</span> <span class="ow">not</span> <span
class="n">is_timer_set</span><span class="o">.</span><span
class="n">read</span><span class="p">():</span>
+<span class="n">timer</span><span class="o">.</span><span
class="n">set</span><span class="p">(</span><span
class="n">Timestamp</span><span class="o">.</span><span
class="n">now</span><span class="p">()</span> <span
class="o">+</span> <span class="n">Duration</span><span
class="p">(</span><span class="n">seconds</span><span
class="o">=</span><span class="mi">10</span><span
class="p">))</span>
+<span class="n">is_timer_set</span><span
class="o">.</span><span class="n">write</span><span
class="p">(</span><span class="bp">True</span><span
class="p">)</span>
+<span class="nd">@on_timer</span><span class="p">(</span><span
class="n">OUTPUT</span><span class="p">)</span>
+<span class="k">def</span> <span
class="nf">output_callback</span><span class="p">(</span><span
class="bp">self</span><span class="p">,</span>
+<span class="nb">buffer</span><span class="o">=</span><span
class="n">DoFn</span><span class="o">.</span><span
class="n">StateParam</span><span class="p">(</span><span
class="n">BUFFER</span><span class="p">),</span>
+<span class="n">is_timer_set</span><span
class="o">=</span><span class="n">DoFn</span><span
class="o">.</span><span class="n">StateParam</span><span
class="p">(</span><span class="n">IS_TIMER_SET</span><span
class="p">)):</span>
+<span class="n">send_rpc</span><span class="p">(</span><span
class="nb">list</span><span class="p">(</span><span
class="nb">buffer</span><span class="o">.</span><span
class="n">read</span><span class="p">()))</span>
+<span class="nb">buffer</span><span class="o">.</span><span
class="n">clear</span><span class="p">()</span>
+<span class="n">is_timer_set</span><span
class="o">.</span><span class="n">clear</span><span
class="p">()</span></code></pre></div>
+</div>
<h2 id="splittable-dofns">12. Splittable <code>DoFns</code></h2>
<p>A Splittable <code>DoFn</code> (SDF) enables users to create
modular components containing I/Os (and some advanced
<a href="https://s.apache.org/splittable-do-fn#heading=h.5cep9s8k4fxv">non
I/O use cases</a>). Having modular
diff --git
a/website/generated-content/documentation/programming-guide/index.html
b/website/generated-content/documentation/programming-guide/index.html
index 637506d..b1d044c 100644
--- a/website/generated-content/documentation/programming-guide/index.html
+++ b/website/generated-content/documentation/programming-guide/index.html
@@ -2280,7 +2280,7 @@ to other nodes in the graph. A <code>DoFn</code> can
declare multiple state vari
read and modified inside the DoFn’s <code>@ProcessElement</code> or
<code>@OnTimer</code> methods. If the type of the ValueState has a coder
registered, then Beam will automatically infer the coder for the state value.
Otherwise, a coder can be explicitly
specified when creating the ValueState. For example, the following ParDo
creates a single state variable that
-accumulates the number of elements seen.</p><div class=language-java><div
class=highlight><pre class=chroma><code class=language-java
data-lang=java><span class=n>PCollection</span><span class=o><</span><span
class=n>KV</span><span class=o><</span><span class=n>String</span><span
class=o>,</span> <span class=n>ValueT</span><span class=o>>></span> <span
class=n>perUser</span> <span class=o>=</span> <span
class=n>readPerUser</span><span class=o>();</span>
+accumulates the number of elements seen.</p><p>Note: <code>ValueState</code>
is called <code>ReadModifyWriteState</code> in the Python SDK.</p><div
class=language-java><div class=highlight><pre class=chroma><code
class=language-java data-lang=java><span class=n>PCollection</span><span
class=o><</span><span class=n>KV</span><span class=o><</span><span
class=n>String</span><span class=o>,</span> <span class=n>ValueT</span><span
class=o>>></span> <span class=n>perUser</span> <sp [...]
<span class=n>perUser</span><span class=o>.</span><span
class=na>apply</span><span class=o>(</span><span class=n>ParDo</span><span
class=o>.</span><span class=na>of</span><span class=o>(</span><span
class=k>new</span> <span class=n>DoFn</span><span class=o><</span><span
class=n>KV</span><span class=o><</span><span class=n>String</span><span
class=o>,</span> <span class=n>ValueT</span><span class=o>>,</span> <span
class=n>OutputT</span><span class=o>>()</span> <span class=o>{</span>
<span class=nd>@StateId</span><span class=o>(</span><span
class=s>"state"</span><span class=o>)</span> <span
class=kd>private</span> <span class=kd>final</span> <span
class=n>StateSpec</span><span class=o><</span><span
class=n>ValueState</span><span class=o><</span><span
class=n>Integer</span><span class=o>>></span> <span
class=n>numElements</span> <span class=o>=</span> <span
class=n>StateSpecs</span><span class=o>.</span><span class=na>value</span><span
class=o>() [...]
@@ -2295,7 +2295,16 @@ accumulates the number of elements seen.</p><div
class=language-java><div class=
<span class=n>perUser</span><span class=o>.</span><span
class=na>apply</span><span class=o>(</span><span class=n>ParDo</span><span
class=o>.</span><span class=na>of</span><span class=o>(</span><span
class=k>new</span> <span class=n>DoFn</span><span class=o><</span><span
class=n>KV</span><span class=o><</span><span class=n>String</span><span
class=o>,</span> <span class=n>ValueT</span><span class=o>>,</span> <span
class=n>OutputT</span><span class=o>>()</span> <span class=o>{</span>
<span class=nd>@StateId</span><span class=o>(</span><span
class=s>"state"</span><span class=o>)</span> <span
class=kd>private</span> <span class=kd>final</span> <span
class=n>StateSpec</span><span class=o><</span><span
class=n>ValueState</span><span class=o><</span><span
class=n>MyType</span><span class=o>>></span> <span
class=n>numElements</span> <span class=o>=</span> <span
class=n>StateSpecs</span><span class=o>.</span><span class=na>value</span><span
class=o>(</ [...]
<span class=o>...</span>
-<span class=o>}));</span></code></pre></div></div><h4
id=combiningstate>CombiningState</h4><p><code>CombiningState</code> allows you
to create a state object that is updated using a Beam combiner. For example,
the previous
+<span class=o>}));</span></code></pre></div></div><div class=language-py><div
class=highlight><pre class=chroma><code class=language-py data-lang=py><span
class=k>class</span> <span class=nc>ReadModifyWriteStateDoFn</span><span
class=p>(</span><span class=n>DoFn</span><span class=p>):</span>
+ <span class=n>STATE_SPEC</span> <span class=o>=</span> <span
class=n>ReadModifyWriteStateSpec</span><span class=p>(</span><span
class=s1>'num_elements'</span><span class=p>,</span> <span
class=n>VarIntCoder</span><span class=p>())</span>
+
+ <span class=k>def</span> <span class=nf>process</span><span
class=p>(</span><span class=bp>self</span><span class=p>,</span> <span
class=n>element</span><span class=p>,</span> <span class=n>state</span><span
class=o>=</span><span class=n>DoFn</span><span class=o>.</span><span
class=n>StateParam</span><span class=p>(</span><span
class=n>STATE_SPEC</span><span class=p>)):</span>
+ <span class=c1># Read the number element seen so far for this user
key.</span>
+ <span class=n>current_value</span> <span class=o>=</span> <span
class=n>state</span><span class=o>.</span><span class=n>read</span><span
class=p>()</span> <span class=ow>or</span> <span class=mi>0</span>
+ <span class=n>state</span><span class=o>.</span><span
class=n>write</span><span class=p>(</span><span
class=n>current_value</span><span class=o>+</span><span class=mi>1</span><span
class=p>)</span>
+
+<span class=n>_</span> <span class=o>=</span> <span class=p>(</span><span
class=n>p</span> <span class=o>|</span> <span class=s1>'Read per
user'</span> <span class=o>>></span> <span
class=n>ReadPerUser</span><span class=p>()</span>
+ <span class=o>|</span> <span class=s1>'state pardo'</span>
<span class=o>>></span> <span class=n>beam</span><span
class=o>.</span><span class=n>ParDo</span><span class=p>(</span><span
class=n>ReadModifyWriteStateDoFn</span><span
class=p>()))</span></code></pre></div></div><h4
id=combiningstate>CombiningState</h4><p><code>CombiningState</code> allows you
to create a state object that is updated using a Beam combiner. For example,
the previous
<code>ValueState</code> example could be rewritten to use
<code>CombiningState</code></p><div class=language-java><div
class=highlight><pre class=chroma><code class=language-java
data-lang=java><span class=n>PCollection</span><span class=o><</span><span
class=n>KV</span><span class=o><</span><span class=n>String</span><span
class=o>,</span> <span class=n>ValueT</span><span class=o>>></span> <span
class=n>perUser</span> <span class=o>=</span> <span
class=n>readPerUser</span><s [...]
<span class=n>perUser</span><span class=o>.</span><span
class=na>apply</span><span class=o>(</span><span class=n>ParDo</span><span
class=o>.</span><span class=na>of</span><span class=o>(</span><span
class=k>new</span> <span class=n>DoFn</span><span class=o><</span><span
class=n>KV</span><span class=o><</span><span class=n>String</span><span
class=o>,</span> <span class=n>ValueT</span><span class=o>>,</span> <span
class=n>OutputT</span><span class=o>>()</span> <span class=o>{</span>
<span class=nd>@StateId</span><span class=o>(</span><span
class=s>"state"</span><span class=o>)</span> <span
class=kd>private</span> <span class=kd>final</span> <span
class=n>StateSpec</span><span class=o><</span><span
class=n>CombiningState</span><span class=o><</span><span
class=n>Integer</span><span class=o>,</span> <span class=kt>int</span><span
class=o>[],</span> <span class=n>Integer</span><span class=o>>></span>
<span class=n>numElements</span> <span class=o> [...]
@@ -2304,10 +2313,10 @@ accumulates the number of elements seen.</p><div
class=language-java><div class=
<span class=nd>@ProcessElement</span> <span class=kd>public</span> <span
class=kt>void</span> <span class=nf>process</span><span class=o>(</span><span
class=nd>@StateId</span><span class=o>(</span><span
class=s>"state"</span><span class=o>)</span> <span
class=n>ValueState</span><span class=o><</span><span
class=n>Integer</span><span class=o>></span> <span class=n>state</span><span
class=o>)</span> <span class=o>{</span>
<span class=n>state</span><span class=o>.</span><span
class=na>add</span><span class=o>(</span><span class=n>1</span><span
class=o>);</span>
<span class=o>}</span>
-<span class=o>}));</span></code></pre></div></div><div
class=language-python><div class=highlight><pre class=chroma><code
class=language-python data-lang=python><span class=k>class</span> <span
class=nc>CombiningStateDoFn</span><span class=p>(</span><span
class=n>DoFn</span><span class=p>):</span>
+<span class=o>}));</span></code></pre></div></div><div class=language-py><div
class=highlight><pre class=chroma><code class=language-py data-lang=py><span
class=k>class</span> <span class=nc>CombiningStateDoFn</span><span
class=p>(</span><span class=n>DoFn</span><span class=p>):</span>
<span class=n>SUM_TOTAL</span> <span class=o>=</span> <span
class=n>CombiningValueStateSpec</span><span class=p>(</span><span
class=s1>'total'</span><span class=p>,</span> <span
class=nb>sum</span><span class=p>)</span>
- <span class=k>def</span> <span class=nf>process</span><span
class=p>(</span><span class=bp>self</span><span class=p>,</span> <span
class=n>element</span><span class=p>,</span> <span class=n>state</span><span
class=o>=</span><span class=n>SoFn</span><span class=o>.</span><span
class=n>StateParam</span><span class=p>(</span><span
class=n>SUM_TOTAL</span><span class=p>)):</span>
+ <span class=k>def</span> <span class=nf>process</span><span
class=p>(</span><span class=bp>self</span><span class=p>,</span> <span
class=n>element</span><span class=p>,</span> <span class=n>state</span><span
class=o>=</span><span class=n>DoFn</span><span class=o>.</span><span
class=n>StateParam</span><span class=p>(</span><span
class=n>SUM_TOTAL</span><span class=p>)):</span>
<span class=n>state</span><span class=o>.</span><span
class=n>add</span><span class=p>(</span><span class=mi>1</span><span
class=p>)</span>
<span class=n>_</span> <span class=o>=</span> <span class=p>(</span><span
class=n>p</span> <span class=o>|</span> <span class=s1>'Read per
user'</span> <span class=o>>></span> <span
class=n>ReadPerUser</span><span class=p>()</span>
@@ -2330,7 +2339,7 @@ bags larger than available memory.</p><div
class=language-java><div class=highli
<span class=n>state</span><span class=o>.</span><span
class=na>clear</span><span class=o>();</span> <span class=c1>// Clear the
state for this key.
</span><span class=c1></span> <span class=o>}</span>
<span class=o>}</span>
-<span class=o>}));</span></code></pre></div></div><div
class=language-python><div class=highlight><pre class=chroma><code
class=language-python data-lang=python><span class=k>class</span> <span
class=nc>BagStateDoFn</span><span class=p>(</span><span
class=n>DoFn</span><span class=p>):</span>
+<span class=o>}));</span></code></pre></div></div><div class=language-py><div
class=highlight><pre class=chroma><code class=language-py data-lang=py><span
class=k>class</span> <span class=nc>BagStateDoFn</span><span
class=p>(</span><span class=n>DoFn</span><span class=p>):</span>
<span class=n>ALL_ELEMENTS</span> <span class=o>=</span> <span
class=n>BagStateSpec</span><span class=p>(</span><span
class=s1>'buffer'</span><span class=p>,</span> <span
class=n>coders</span><span class=o>.</span><span
class=n>VarIntCoder</span><span class=p>())</span>
<span class=k>def</span> <span class=nf>process</span><span
class=p>(</span><span class=bp>self</span><span class=p>,</span> <span
class=n>element_pair</span><span class=p>,</span> <span
class=n>state</span><span class=o>=</span><span class=n>DoFn</span><span
class=o>.</span><span class=n>StateParam</span><span class=p>(</span><span
class=n>ALL_ELEMENTS</span><span class=p>)):</span>
@@ -2358,7 +2367,7 @@ runner can prefetch all of the states necessary. For
example:</p><div class=lang
<span class=n>state2</span><span class=o>.</span><span
class=na>read</span><span class=o>();</span>
<span class=n>state3</span><span class=o>.</span><span
class=na>read</span><span class=o>();</span>
<span class=o>}</span>
-<span class=o>}));</span></code></pre></div></div><p>If however there are code
paths in which the states are not fetched, then annotating with @AlwaysFetched
will add
+<span class=o>}));</span></code></pre></div></div><div class=language-py><div
class=highlight><pre class=chroma><code class=language-py data-lang=py><span
class=n>This</span> <span class=ow>is</span> <span class=ow>not</span> <span
class=n>supported</span> <span class=n>yet</span><span class=p>,</span> <span
class=n>see</span> <span class=n>BEAM</span><span class=o>-</span><span
class=mf>11506.</span></code></pre></div></div><p>If however there are code
paths in which the states are not [...]
unnecessary fetching for those paths. In this case, the readLater method
allows the runner to know that the state will
be read in the future, allowing multiple state reads to be batched
together.</p><div class=language-java><div class=highlight><pre
class=chroma><code class=language-java data-lang=java><span
class=n>PCollection</span><span class=o><</span><span class=n>KV</span><span
class=o><</span><span class=n>String</span><span class=o>,</span> <span
class=n>ValueT</span><span class=o>>></span> <span class=n>perUser</span>
<span class=o>=</span> <span class=n>readPerUser</span><span class [...]
<span class=n>perUser</span><span class=o>.</span><span
class=na>apply</span><span class=o>(</span><span class=n>ParDo</span><span
class=o>.</span><span class=na>of</span><span class=o>(</span><span
class=k>new</span> <span class=n>DoFn</span><span class=o><</span><span
class=n>KV</span><span class=o><</span><span class=n>String</span><span
class=o>,</span> <span class=n>ValueT</span><span class=o>>,</span> <span
class=n>OutputT</span><span class=o>>()</span> <span class=o>{</span>
@@ -2404,7 +2413,7 @@ allows for event-time aggregations.</p><div
class=language-java><div class=highl
<span class=nd>@OnTimer</span><span class=o>(</span><span
class=s>"timer"</span><span class=o>)</span> <span
class=kd>public</span> <span class=kt>void</span> <span
class=nf>onTimer</span><span class=o>()</span> <span class=o>{</span>
<span class=c1>//Process timer.
</span><span class=c1></span> <span class=o>}</span>
-<span class=o>}));</span></code></pre></div></div><div
class=language-python><div class=highlight><pre class=chroma><code
class=language-python data-lang=python><span class=k>class</span> <span
class=nc>EventTimerDoFn</span><span class=p>(</span><span
class=n>DoFn</span><span class=p>):</span>
+<span class=o>}));</span></code></pre></div></div><div class=language-py><div
class=highlight><pre class=chroma><code class=language-py data-lang=py><span
class=k>class</span> <span class=nc>EventTimerDoFn</span><span
class=p>(</span><span class=n>DoFn</span><span class=p>):</span>
<span class=n>ALL_ELEMENTS</span> <span class=o>=</span> <span
class=n>BagStateSpec</span><span class=p>(</span><span
class=s1>'buffer'</span><span class=p>,</span> <span
class=n>coders</span><span class=o>.</span><span
class=n>VarIntCoder</span><span class=p>())</span>
<span class=n>TIMER</span> <span class=o>=</span> <span
class=n>TimerSpec</span><span class=p>(</span><span
class=s1>'timer'</span><span class=p>,</span> <span
class=n>TimeDomain</span><span class=o>.</span><span
class=n>WATERMARK</span><span class=p>)</span>
@@ -2438,7 +2447,7 @@ to the current time. In Java, the
<code>Timer.offset</code> and <code>Timer.setR
<span class=nd>@OnTimer</span><span class=o>(</span><span
class=s>"timer"</span><span class=o>)</span> <span
class=kd>public</span> <span class=kt>void</span> <span
class=nf>onTimer</span><span class=o>()</span> <span class=o>{</span>
<span class=c1>//Process timer.
</span><span class=c1></span> <span class=o>}</span>
-<span class=o>}));</span></code></pre></div></div><div
class=language-python><div class=highlight><pre class=chroma><code
class=language-python data-lang=python><span class=k>class</span> <span
class=nc>ProcessingTimerDoFn</span><span class=p>(</span><span
class=n>DoFn</span><span class=p>):</span>
+<span class=o>}));</span></code></pre></div></div><div class=language-py><div
class=highlight><pre class=chroma><code class=language-py data-lang=py><span
class=k>class</span> <span class=nc>ProcessingTimerDoFn</span><span
class=p>(</span><span class=n>DoFn</span><span class=p>):</span>
<span class=n>ALL_ELEMENTS</span> <span class=o>=</span> <span
class=n>BagStateSpec</span><span class=p>(</span><span
class=s1>'buffer'</span><span class=p>,</span> <span
class=n>coders</span><span class=o>.</span><span
class=n>VarIntCoder</span><span class=p>())</span>
<span class=n>TIMER</span> <span class=o>=</span> <span
class=n>TimerSpec</span><span class=p>(</span><span
class=s1>'timer'</span><span class=p>,</span> <span
class=n>TimeDomain</span><span class=o>.</span><span
class=n>REAL_TIME</span><span class=p>)</span>
@@ -2456,11 +2465,12 @@ to the current time. In Java, the
<code>Timer.offset</code> and <code>Timer.setR
<span class=n>state</span><span class=o>.</span><span
class=n>clear</span><span class=p>()</span>
<span class=n>_</span> <span class=o>=</span> <span class=p>(</span><span
class=n>p</span> <span class=o>|</span> <span class=s1>'Read per
user'</span> <span class=o>>></span> <span
class=n>ReadPerUser</span><span class=p>()</span>
- <span class=o>|</span> <span class=s1>'ProcessingTime timer
pardo'</span> <span class=o>>></span> <span class=n>beam</span><span
class=o>.</span><span class=n>ParDo</span><span class=p>(</span><span
class=n>ProcessingTimerDoFn</span><span
class=p>()))</span></code></pre></div></div><h4 id=dynamic-timer-tags>11.3.3.
Dynamic timer tags</h4><p>Beam also supports dynamically setting a timer tag
using <code>TimerMap</code>. This allows for setting multiple different timers
+ <span class=o>|</span> <span class=s1>'ProcessingTime timer
pardo'</span> <span class=o>>></span> <span class=n>beam</span><span
class=o>.</span><span class=n>ParDo</span><span class=p>(</span><span
class=n>ProcessingTimerDoFn</span><span
class=p>()))</span></code></pre></div></div><h4 id=dynamic-timer-tags>11.3.3.
Dynamic timer tags</h4><p>Beam also supports dynamically setting a timer tag
using <code>TimerMap</code> in the Java SDK. This allows for setting multiple
[...]
in a <code>DoFn</code> and allowing for the timer tags to be dynamically
chosen - e.g. based on data in the input elements. A
timer with a specific tag can only be set to a single timestamp, so setting
the timer again has the effect of
overwriting the previous expiration time for the timer with that tag. Each
<code>TimerMap</code> is identified with a timer family
-id, and timers in different timer families are independent.</p><div
class=language-java><div class=highlight><pre class=chroma><code
class=language-java data-lang=java><span class=n>PCollection</span><span
class=o><</span><span class=n>KV</span><span class=o><</span><span
class=n>String</span><span class=o>,</span> <span class=n>ValueT</span><span
class=o>>></span> <span class=n>perUser</span> <span class=o>=</span>
<span class=n>readPerUser</span><span class=o>();</span>
+id, and timers in different timer families are independent.</p><p>In the
Python SDK, a dynamic timer tag can be specified while calling
<code>set()</code> or <code>clear()</code>. By default, the timer
+tag is an empty string if not specified.</p><div class=language-java><div
class=highlight><pre class=chroma><code class=language-java
data-lang=java><span class=n>PCollection</span><span class=o><</span><span
class=n>KV</span><span class=o><</span><span class=n>String</span><span
class=o>,</span> <span class=n>ValueT</span><span class=o>>></span> <span
class=n>perUser</span> <span class=o>=</span> <span
class=n>readPerUser</span><span class=o>();</span>
<span class=n>perUser</span><span class=o>.</span><span
class=na>apply</span><span class=o>(</span><span class=n>ParDo</span><span
class=o>.</span><span class=na>of</span><span class=o>(</span><span
class=k>new</span> <span class=n>DoFn</span><span class=o><</span><span
class=n>KV</span><span class=o><</span><span class=n>String</span><span
class=o>,</span> <span class=n>ValueT</span><span class=o>>,</span> <span
class=n>OutputT</span><span class=o>>()</span> <span class=o>{</span>
<span class=nd>@TimerFamily</span><span class=o>(</span><span
class=s>"actionTimers"</span><span class=o>)</span> <span
class=kd>private</span> <span class=kd>final</span> <span
class=n>TimerSpec</span> <span class=n>timer</span> <span class=o>=</span>
<span class=n>TimerSpecs</span><span class=o>.</span><span
class=na>timerMap</span><span class=o>(</span><span
class=n>TimeDomain</span><span class=o>.</span><span
class=na>EVENT_TIME</span><span class=o>);</span>
@@ -2475,7 +2485,30 @@ id, and timers in different timer families are
independent.</p><div class=langua
<span class=nd>@OnTimerFamily</span><span class=o>(</span><span
class=s>"actionTimers"</span><span class=o>)</span> <span
class=kd>public</span> <span class=kt>void</span> <span
class=nf>onTimer</span><span class=o>(</span><span class=nd>@TimerId</span>
<span class=n>String</span> <span class=n>timerId</span><span class=o>)</span>
<span class=o>{</span>
<span class=n>LOG</span><span class=o>.</span><span
class=na>info</span><span class=o>(</span><span class=s>"Timer fired with
id "</span> <span class=o>+</span> <span class=n>timerId</span><span
class=o>);</span>
<span class=o>}</span>
-<span class=o>}));</span></code></pre></div></div><div
class=language-python><div class=highlight><pre class=chroma><code
class=language-python data-lang=python><span class=n>To</span> <span
class=n>be</span> <span class=n>supported</span><span class=p>,</span> <span
class=n>See</span> <span class=n>BEAM</span><span class=o>-</span><span
class=mi>9602</span></code></pre></div></div><h4
id=timer-output-timestamps>11.3.4. Timer output timestamps</h4><p>By default,
event-time timers will ho [...]
+<span class=o>}));</span></code></pre></div></div><div class=language-py><div
class=highlight><pre class=chroma><code class=language-py data-lang=py><span
class=k>class</span> <span class=nc>TimerDoFn</span><span class=p>(</span><span
class=n>DoFn</span><span class=p>):</span>
+ <span class=n>ALL_ELEMENTS</span> <span class=o>=</span> <span
class=n>BagStateSpec</span><span class=p>(</span><span
class=s1>'buffer'</span><span class=p>,</span> <span
class=n>coders</span><span class=o>.</span><span
class=n>VarIntCoder</span><span class=p>())</span>
+ <span class=n>TIMER</span> <span class=o>=</span> <span
class=n>TimerSpec</span><span class=p>(</span><span
class=s1>'timer'</span><span class=p>,</span> <span
class=n>TimeDomain</span><span class=o>.</span><span
class=n>REAL_TIME</span><span class=p>)</span>
+
+ <span class=k>def</span> <span class=nf>process</span><span
class=p>(</span><span class=bp>self</span><span class=p>,</span>
+ <span class=n>element_pair</span><span class=p>,</span>
+ <span class=nb>buffer</span> <span class=o>=</span> <span
class=n>DoFn</span><span class=o>.</span><span class=n>StateParam</span><span
class=p>(</span><span class=n>ALL_ELEMENTS</span><span class=p>),</span>
+ <span class=n>timer</span> <span class=o>=</span> <span
class=n>DoFn</span><span class=o>.</span><span class=n>TimerParam</span><span
class=p>(</span><span class=n>TIMER</span><span class=p>)):</span>
+ <span class=nb>buffer</span><span class=o>.</span><span
class=n>add</span><span class=p>(</span><span class=n>element_pair</span><span
class=p>[</span><span class=mi>1</span><span class=p>])</span>
+ <span class=c1># Set a timer to go off 30 seconds in the future with
dynamic timer tag 'first_timer'.</span>
+ <span class=c1># And set a timer to go off 60 seconds in the future with
dynamic timer tag 'second_timer'.</span>
+ <span class=n>timer</span><span class=o>.</span><span
class=n>set</span><span class=p>(</span><span class=n>Timestamp</span><span
class=o>.</span><span class=n>now</span><span class=p>()</span> <span
class=o>+</span> <span class=n>Duration</span><span class=p>(</span><span
class=n>seconds</span><span class=o>=</span><span class=mi>30</span><span
class=p>),</span> <span class=n>dynamic_timer_tag</span><span
class=o>=</span><span class=s1>'first_timer'</span><span
class=p>)</span>
+ <span class=n>timer</span><span class=o>.</span><span
class=n>set</span><span class=p>(</span><span class=n>Timestamp</span><span
class=o>.</span><span class=n>now</span><span class=p>()</span> <span
class=o>+</span> <span class=n>Duration</span><span class=p>(</span><span
class=n>seconds</span><span class=o>=</span><span class=mi>60</span><span
class=p>),</span> <span class=n>dynamic_timer_tag</span><span
class=o>=</span><span class=s1>'second_timer'</span><span
class=p>)</span>
+ <span class=c1># Note that a timer can also be explicitly cleared if
previously set with a dynamic timer tag:</span>
+ <span class=c1># timer.clear(dynamic_timer_tag=...)</span>
+
+ <span class=nd>@on_timer</span><span class=p>(</span><span
class=n>TIMER</span><span class=p>)</span>
+ <span class=k>def</span> <span class=nf>expiry_callback</span><span
class=p>(</span><span class=bp>self</span><span class=p>,</span> <span
class=nb>buffer</span> <span class=o>=</span> <span class=n>DoFn</span><span
class=o>.</span><span class=n>StateParam</span><span class=p>(</span><span
class=n>ALL_ELEMENTS</span><span class=p>),</span> <span
class=n>timer_tag</span><span class=o>=</span><span class=n>DoFn</span><span
class=o>.</span><span class=n>DynamicTimerTagParam</span><span cl [...]
+ <span class=c1># Process timer, the dynamic timer tag associated with
expiring timer can be read back with DoFn.DynamicTimerTagParam.</span>
+ <span class=nb>buffer</span><span class=o>.</span><span
class=n>clear</span><span class=p>()</span>
+ <span class=k>yield</span> <span class=p>(</span><span
class=n>timer_tag</span><span class=p>,</span> <span
class=s1>'fired'</span><span class=p>)</span>
+
+<span class=n>_</span> <span class=o>=</span> <span class=p>(</span><span
class=n>p</span> <span class=o>|</span> <span class=s1>'Read per
user'</span> <span class=o>>></span> <span
class=n>ReadPerUser</span><span class=p>()</span>
+ <span class=o>|</span> <span class=s1>'ProcessingTime timer
pardo'</span> <span class=o>>></span> <span class=n>beam</span><span
class=o>.</span><span class=n>ParDo</span><span class=p>(</span><span
class=n>TimerDoFn</span><span class=p>()))</span></code></pre></div></div><h4
id=timer-output-timestamps>11.3.4. Timer output timestamps</h4><p>By default,
event-time timers will hold the output watermark of the <code>ParDo</code> to
the timestamp of the timer. This means
that if a timer is set to 12pm, any windowed aggregations or event-time timers
later in the pipeline graph that finish
after 12pm will not expire. The timestamp of the timer is also the default
output timestamp for the timer callback. This
means that any elements output from the onTimer method will have a timestamp
equal to the timestamp of the timer firing.
@@ -2562,7 +2595,7 @@ past the timestamp of the minimum element. The following
code demonstrates this.
<span class=c1>// Note that the timer has now fired.
</span><span class=c1></span> <span class=n>timerTimestamp</span><span
class=o>.</span><span class=na>clear</span><span class=o>();</span>
<span class=o>}</span>
-<span class=o>}));</span></code></pre></div></div><h3
id=garbage-collecting-state>11.4. Garbage collecting state</h3><p>Per-key state
needs to be garbage collected, or eventually the increasing size of state may
negatively impact
+<span class=o>}));</span></code></pre></div></div><div class=language-py><div
class=highlight><pre class=chroma><code class=language-py data-lang=py><span
class=n>Timer</span> <span class=n>output</span> <span
class=n>timestamps</span> <span class=ow>is</span> <span class=ow>not</span>
<span class=n>yet</span> <span class=n>supported</span> <span
class=ow>in</span> <span class=n>Python</span> <span class=n>SDK</span><span
class=o>.</span> <span class=n>See</span> <span class=n>BEAM</span [...]
performance. There are two common strategies for garbage collecting
state.</p><h5 id=using-windows-for-garbage-collection>11.4.1. <strong>Using
windows for garbage collection</strong></h5><p>All state and timers for a key
is scoped to the window it is in. This means that depending on the timestamp of
the
input element the ParDo will see different values for the state depending on
the window that element falls into. In
addition, once the input watermark passes the end of the window, the runner
should garbage collect all state for that
@@ -2578,7 +2611,7 @@ garbage-collection strategy.</p><p>For example, given the
following:</p><div cla
<span class=c1>// The state is scoped to a calendar day window.
That means that if the input timestamp ts is after
</span><span class=c1></span> <span class=c1>// midnight PST,
then a new copy of the state will be seen for the next day.
</span><span class=c1></span> <span class=o>}</span>
- <span class=o>}));</span></code></pre></div></div><div
class=language-python><div class=highlight><pre class=chroma><code
class=language-python data-lang=python><span class=k>class</span> <span
class=nc>StateDoFn</span><span class=p>(</span><span class=n>DoFn</span><span
class=p>):</span>
+ <span class=o>}));</span></code></pre></div></div><div
class=language-py><div class=highlight><pre class=chroma><code
class=language-py data-lang=py><span class=k>class</span> <span
class=nc>StateDoFn</span><span class=p>(</span><span class=n>DoFn</span><span
class=p>):</span>
<span class=n>ALL_ELEMENTS</span> <span class=o>=</span> <span
class=n>BagStateSpec</span><span class=p>(</span><span
class=s1>'buffer'</span><span class=p>,</span> <span
class=n>coders</span><span class=o>.</span><span
class=n>VarIntCoder</span><span class=p>())</span>
<span class=k>def</span> <span class=nf>process</span><span
class=p>(</span><span class=bp>self</span><span class=p>,</span>
@@ -2625,7 +2658,7 @@ This can be done by updating a timer that garbage
collects state. For example</p
</span><span class=c1></span> <span class=n>state</span><span
class=o>.</span><span class=na>clear</span><span class=o>();</span>
<span class=n>maxTimestamp</span><span class=o>.</span><span
class=na>clear</span><span class=o>();</span>
<span class=o>}</span>
- <span class=o>}</span></code></pre></div></div><div
class=language-python><div class=highlight><pre class=chroma><code
class=language-python data-lang=python><span class=k>class</span> <span
class=nc>UserDoFn</span><span class=p>(</span><span class=n>DoFn</span><span
class=p>):</span>
+ <span class=o>}</span></code></pre></div></div><div class=language-py><div
class=highlight><pre class=chroma><code class=language-py data-lang=py><span
class=k>class</span> <span class=nc>UserDoFn</span><span class=p>(</span><span
class=n>DoFn</span><span class=p>):</span>
<span class=n>ALL_ELEMENTS</span> <span class=o>=</span> <span
class=n>BagStateSpec</span><span class=p>(</span><span
class=s1>'state'</span><span class=p>,</span> <span
class=n>coders</span><span class=o>.</span><span
class=n>VarIntCoder</span><span class=p>())</span>
<span class=n>MAX_TIMESTAMP</span> <span class=o>=</span> <span
class=n>CombiningValueStateSpec</span><span class=p>(</span><span
class=s1>'max_timestamp_seen'</span><span class=p>,</span> <span
class=nb>max</span><span class=p>)</span>
<span class=n>TIMER</span> <span class=o>=</span> <span
class=n>TimerSpec</span><span class=p>(</span><span
class=s1>'gc-timer'</span><span class=p>,</span> <span
class=n>TimeDomain</span><span class=o>.</span><span
class=n>WATERMARK</span><span class=p>)</span>
@@ -2667,7 +2700,7 @@ event before the view event. The one hour join timeout
should be based on event
<span class=n>readEvents</span><span class=o>()</span>
<span class=o>.</span><span class=na>apply</span><span
class=o>(</span><span class=n>WithKeys</span><span class=o>.</span><span
class=na>of</span><span class=o>(</span><span class=n>Event</span><span
class=o>::</span><span class=n>getLinkId</span><span class=o>).</span><span
class=na>withKeyType</span><span class=o>(</span><span
class=n>TypeDescriptors</span><span class=o>.</span><span
class=na>strings</span><span class=o>()));</span>
-<span class=n>perUser</span><span class=o>.</span><span
class=na>apply</span><span class=o>(</span><span class=n>ParDo</span><span
class=o>.</span><span class=na>of</span><span class=o>(</span><span
class=k>new</span> <span class=n>DoFn</span><span class=o><</span><span
class=n>KV</span><span class=o><</span><span class=n>String</span><span
class=o>,</span> <span class=n>Event</span><span class=o>>,</span> <span
class=n>JoinedEvent</span><span class=o>>()</span> <span class=o [...]
+<span class=n>eventsPerLinkId</span><span class=o>.</span><span
class=na>apply</span><span class=o>(</span><span class=n>ParDo</span><span
class=o>.</span><span class=na>of</span><span class=o>(</span><span
class=k>new</span> <span class=n>DoFn</span><span class=o><</span><span
class=n>KV</span><span class=o><</span><span class=n>String</span><span
class=o>,</span> <span class=n>Event</span><span class=o>>,</span> <span
class=n>JoinedEvent</span><span class=o>>()</span> <span [...]
<span class=c1>// Store the view event.
</span><span class=c1></span> <span class=nd>@StateId</span><span
class=o>(</span><span class=s>"view"</span><span class=o>)</span> <span
class=kd>private</span> <span class=kd>final</span> <span
class=n>StateSpec</span><span class=o><</span><span
class=n>ValueState</span><span class=o><</span><span
class=n>Event</span><span class=o>>></span> <span
class=n>viewState</span> <span class=o>=</span> <span
class=n>StateSpecs</span><span class=o>.</span><span class=na>valu [...]
<span class=c1>// Store the click event.
@@ -2726,7 +2759,54 @@ event before the view event. The one hour join timeout
should be based on event
<span class=n>clickState</span><span class=o>.</span><span
class=na>clear</span><span class=o>();</span>
<span class=n>maxTimestampState</span><span class=o>.</span><span
class=na>clear</span><span class=o>();</span>
<span class=o>}</span>
- <span class=o>}));</span></code></pre></div></div><h4
id=batching-rpcs>11.5.2. Batching RPCs</h4><p>In this example, input elements
are being forwarded to an external RPC service. The RPC accepts batch requests -
+ <span class=o>}));</span></code></pre></div></div><div class=language-py><div
class=highlight><pre class=chroma><code class=language-py data-lang=py><span
class=k>class</span> <span class=nc>JoinDoFn</span><span class=p>(</span><span
class=n>DoFn</span><span class=p>):</span>
+ <span class=c1># stores the view event.</span>
+ <span class=n>VIEW_STATE_SPEC</span> <span class=o>=</span> <span
class=n>ReadModifyWriteStateSpec</span><span class=p>(</span><span
class=s1>'view'</span><span class=p>,</span> <span
class=n>EventCoder</span><span class=p>())</span>
+ <span class=c1># stores the click event.</span>
+ <span class=n>CLICK_STATE_SPEC</span> <span class=o>=</span> <span
class=n>ReadModifyWriteStateSpec</span><span class=p>(</span><span
class=s1>'click'</span><span class=p>,</span> <span
class=n>EventCoder</span><span class=p>())</span>
+ <span class=c1># The maximum element timestamp value seen so far.</span>
+ <span class=n>MAX_TIMESTAMP</span> <span class=o>=</span> <span
class=n>CombiningValueStateSpec</span><span class=p>(</span><span
class=s1>'max_timestamp_seen'</span><span class=p>,</span> <span
class=nb>max</span><span class=p>)</span>
+ <span class=c1># Timer that fires when an hour goes by with an incomplete
join.</span>
+ <span class=n>GC_TIMER</span> <span class=o>=</span> <span
class=n>TimerSpec</span><span class=p>(</span><span
class=s1>'gc'</span><span class=p>,</span> <span
class=n>TimeDomain</span><span class=o>.</span><span
class=n>WATERMARK</span><span class=p>)</span>
+
+ <span class=k>def</span> <span class=nf>process</span><span
class=p>(</span><span class=bp>self</span><span class=p>,</span>
+ <span class=n>element</span><span class=p>,</span>
+ <span class=n>view</span><span class=o>=</span><span
class=n>DoFn</span><span class=o>.</span><span class=n>StateParam</span><span
class=p>(</span><span class=n>VIEW_STATE_SPEC</span><span class=p>),</span>
+ <span class=n>click</span><span class=o>=</span><span
class=n>DoFn</span><span class=o>.</span><span class=n>StateParam</span><span
class=p>(</span><span class=n>CLICK_STATE_SPEC</span><span class=p>),</span>
+ <span class=n>max_timestamp_seen</span><span
class=o>=</span><span class=n>DoFn</span><span class=o>.</span><span
class=n>StateParam</span><span class=p>(</span><span
class=n>MAX_TIMESTAMP</span><span class=p>),</span>
+ <span class=n>ts</span><span class=o>=</span><span
class=n>DoFn</span><span class=o>.</span><span
class=n>TimestampParam</span><span class=p>,</span>
+ <span class=n>gc</span><span class=o>=</span><span
class=n>DoFn</span><span class=o>.</span><span class=n>TimerParam</span><span
class=p>(</span><span class=n>GC_TIMER</span><span class=p>)):</span>
+ <span class=n>event</span> <span class=o>=</span> <span
class=n>element</span>
+ <span class=k>if</span> <span class=n>event</span><span
class=o>.</span><span class=n>type</span> <span class=o>==</span> <span
class=s1>'view'</span><span class=p>:</span>
+ <span class=n>view</span><span class=o>.</span><span
class=n>write</span><span class=p>(</span><span class=n>event</span><span
class=p>)</span>
+ <span class=k>else</span><span class=p>:</span>
+ <span class=n>click</span><span class=o>.</span><span
class=n>write</span><span class=p>(</span><span class=n>event</span><span
class=p>)</span>
+
+ <span class=n>previous_view</span> <span class=o>=</span> <span
class=n>view</span><span class=o>.</span><span class=n>read</span><span
class=p>()</span>
+ <span class=n>previous_click</span> <span class=o>=</span> <span
class=n>click</span><span class=o>.</span><span class=n>read</span><span
class=p>()</span>
+
+ <span class=c1># We've seen both a view and a click. Output a joined
event and clear state.</span>
+ <span class=k>if</span> <span class=n>previous_view</span> <span
class=ow>and</span> <span class=n>previous_click</span><span class=p>:</span>
+ <span class=k>yield</span> <span class=p>(</span><span
class=n>previous_view</span><span class=p>,</span> <span
class=n>previous_click</span><span class=p>)</span>
+ <span class=n>view</span><span class=o>.</span><span
class=n>clear</span><span class=p>()</span>
+ <span class=n>click</span><span class=o>.</span><span
class=n>clear</span><span class=p>()</span>
+ <span class=n>max_timestamp_seen</span><span class=o>.</span><span
class=n>clear</span><span class=p>()</span>
+ <span class=k>else</span><span class=p>:</span>
+ <span class=n>max_timestamp_seen</span><span class=o>.</span><span
class=n>add</span><span class=p>(</span><span class=n>ts</span><span
class=p>)</span>
+ <span class=n>gc</span><span class=o>.</span><span
class=n>set</span><span class=p>(</span><span
class=n>max_timestamp_seen</span><span class=o>.</span><span
class=n>read</span><span class=p>()</span> <span class=o>+</span> <span
class=n>Duration</span><span class=p>(</span><span class=n>seconds</span><span
class=o>=</span><span class=mi>3600</span><span class=p>))</span>
+
+ <span class=nd>@on_timer</span><span class=p>(</span><span
class=n>GC_TIMER</span><span class=p>)</span>
+ <span class=k>def</span> <span class=nf>gc_callback</span><span
class=p>(</span><span class=bp>self</span><span class=p>,</span>
+ <span class=n>view</span><span class=o>=</span><span
class=n>DoFn</span><span class=o>.</span><span class=n>StateParam</span><span
class=p>(</span><span class=n>VIEW_STATE_SPEC</span><span class=p>),</span>
+ <span class=n>click</span><span class=o>=</span><span
class=n>DoFn</span><span class=o>.</span><span class=n>StateParam</span><span
class=p>(</span><span class=n>CLICK_STATE_SPEC</span><span class=p>),</span>
+ <span class=n>max_timestamp_seen</span><span
class=o>=</span><span class=n>DoFn</span><span class=o>.</span><span
class=n>StateParam</span><span class=p>(</span><span
class=n>MAX_TIMESTAMP</span><span class=p>)):</span>
+ <span class=n>view</span><span class=o>.</span><span
class=n>clear</span><span class=p>()</span>
+ <span class=n>click</span><span class=o>.</span><span
class=n>clear</span><span class=p>()</span>
+ <span class=n>max_timestamp_seen</span><span class=o>.</span><span
class=n>clear</span><span class=p>()</span>
+
+
+<span class=n>_</span> <span class=o>=</span> <span class=p>(</span><span
class=n>p</span> <span class=o>|</span> <span
class=s1>'EventsPerLinkId'</span> <span class=o>>></span> <span
class=n>ReadPerLinkEvents</span><span class=p>()</span>
+ <span class=o>|</span> <span class=s1>'Join DoFn'</span> <span
class=o>>></span> <span class=n>beam</span><span class=o>.</span><span
class=n>ParDo</span><span class=p>(</span><span class=n>JoinDoFn</span><span
class=p>()))</span></code></pre></div></div><h4 id=batching-rpcs>11.5.2.
Batching RPCs</h4><p>In this example, input elements are being forwarded to an
external RPC service. The RPC accepts batch requests -
multiple events for the same user can be batched in a single RPC call. Since
this RPC service also imposes rate limits,
we want to batch ten seconds worth of events together in order to reduce the
number of calls.</p><div class=language-java><div class=highlight><pre
class=chroma><code class=language-java data-lang=java><span
class=n>PCollection</span><span class=o><</span><span class=n>KV</span><span
class=o><</span><span class=n>String</span><span class=o>,</span> <span
class=n>ValueT</span><span class=o>>></span> <span class=n>perUser</span>
<span class=o>=</span> <span class=n>readPerUser< [...]
<span class=n>perUser</span><span class=o>.</span><span
class=na>apply</span><span class=o>(</span><span class=n>ParDo</span><span
class=o>.</span><span class=na>of</span><span class=o>(</span><span
class=k>new</span> <span class=n>DoFn</span><span class=o><</span><span
class=n>KV</span><span class=o><</span><span class=n>String</span><span
class=o>,</span> <span class=n>ValueT</span><span class=o>>,</span> <span
class=n>OutputT</span><span class=o>>()</span> <span class=o>{</span>
@@ -2759,7 +2839,27 @@ we want to batch ten seconds worth of events together in
order to reduce the num
<span class=n>elementsState</span><span class=o>.</span><span
class=na>clear</span><span class=o>();</span>
<span class=n>isTimerSetState</span><span class=o>.</span><span
class=na>clear</span><span class=o>();</span>
<span class=o>}</span>
-<span class=o>}));</span></code></pre></div></div><h2 id=splittable-dofns>12.
Splittable <code>DoFns</code></h2><p>A Splittable <code>DoFn</code> (SDF)
enables users to create modular components containing I/Os (and some advanced
+<span class=o>}));</span></code></pre></div></div><div class=language-py><div
class=highlight><pre class=chroma><code class=language-py data-lang=py><span
class=k>class</span> <span class=nc>BufferDoFn</span><span
class=p>(</span><span class=n>DoFn</span><span class=p>):</span>
+ <span class=n>BUFFER</span> <span class=o>=</span> <span
class=n>BagStateSpec</span><span class=p>(</span><span
class=s1>'buffer'</span><span class=p>,</span> <span
class=n>EventCoder</span><span class=p>())</span>
+ <span class=n>IS_TIMER_SET</span> <span class=o>=</span> <span
class=n>ReadModifyWriteStateSpec</span><span class=p>(</span><span
class=s1>'is_timer_set'</span><span class=p>,</span> <span
class=n>BooleanCoder</span><span class=p>())</span>
+ <span class=n>OUTPUT</span> <span class=o>=</span> <span
class=n>TimerSpec</span><span class=p>(</span><span
class=s1>'output'</span><span class=p>,</span> <span
class=n>TimeDomain</span><span class=o>.</span><span
class=n>REAL_TIME</span><span class=p>)</span>
+
+ <span class=k>def</span> <span class=nf>process</span><span
class=p>(</span><span class=bp>self</span><span class=p>,</span>
+ <span class=nb>buffer</span><span class=o>=</span><span
class=n>DoFn</span><span class=o>.</span><span class=n>StateParam</span><span
class=p>(</span><span class=n>BUFFER</span><span class=p>),</span>
+ <span class=n>is_timer_set</span><span class=o>=</span><span
class=n>DoFn</span><span class=o>.</span><span class=n>StateParam</span><span
class=p>(</span><span class=n>IS_TIMER_SET</span><span class=p>),</span>
+ <span class=n>timer</span><span class=o>=</span><span
class=n>DoFn</span><span class=o>.</span><span class=n>TimerParam</span><span
class=p>(</span><span class=n>OUTPUT</span><span class=p>)):</span>
+ <span class=nb>buffer</span><span class=o>.</span><span
class=n>add</span><span class=p>(</span><span class=n>element</span><span
class=p>)</span>
+ <span class=k>if</span> <span class=ow>not</span> <span
class=n>is_timer_set</span><span class=o>.</span><span class=n>read</span><span
class=p>():</span>
+ <span class=n>timer</span><span class=o>.</span><span
class=n>set</span><span class=p>(</span><span class=n>Timestamp</span><span
class=o>.</span><span class=n>now</span><span class=p>()</span> <span
class=o>+</span> <span class=n>Duration</span><span class=p>(</span><span
class=n>seconds</span><span class=o>=</span><span class=mi>10</span><span
class=p>))</span>
+ <span class=n>is_timer_set</span><span class=o>.</span><span
class=n>write</span><span class=p>(</span><span class=bp>True</span><span
class=p>)</span>
+
+ <span class=nd>@on_timer</span><span class=p>(</span><span
class=n>OUTPUT</span><span class=p>)</span>
+ <span class=k>def</span> <span class=nf>output_callback</span><span
class=p>(</span><span class=bp>self</span><span class=p>,</span>
+ <span class=nb>buffer</span><span class=o>=</span><span
class=n>DoFn</span><span class=o>.</span><span class=n>StateParam</span><span
class=p>(</span><span class=n>BUFFER</span><span class=p>),</span>
+ <span class=n>is_timer_set</span><span
class=o>=</span><span class=n>DoFn</span><span class=o>.</span><span
class=n>StateParam</span><span class=p>(</span><span
class=n>IS_TIMER_SET</span><span class=p>)):</span>
+ <span class=n>send_rpc</span><span class=p>(</span><span
class=nb>list</span><span class=p>(</span><span class=nb>buffer</span><span
class=o>.</span><span class=n>read</span><span class=p>()))</span>
+ <span class=nb>buffer</span><span class=o>.</span><span
class=n>clear</span><span class=p>()</span>
+ <span class=n>is_timer_set</span><span class=o>.</span><span
class=n>clear</span><span class=p>()</span></code></pre></div></div><h2
id=splittable-dofns>12. Splittable <code>DoFns</code></h2><p>A Splittable
<code>DoFn</code> (SDF) enables users to create modular components containing
I/Os (and some advanced
<a href="https://s.apache.org/splittable-do-fn#heading=h.5cep9s8k4fxv">non I/O
use cases</a>). Having modular
I/O components that can be connected to each other simplify typical patterns
that users want.
For example, a popular use case is to read filenames from a message queue
followed by parsing those
diff --git a/website/generated-content/sitemap.xml
b/website/generated-content/sitemap.xml
index cc6607f..4bbf3a9 100644
--- a/website/generated-content/sitemap.xml
+++ b/website/generated-content/sitemap.xml
@@ -1 +1 @@
-<?xml version="1.0" encoding="utf-8" standalone="yes"?><urlset
xmlns="http://www.sitemaps.org/schemas/sitemap/0.9"
xmlns:xhtml="http://www.w3.org/1999/xhtml"><url><loc>/categories/blog/</loc><lastmod>2021-01-20T19:53:05+03:00</lastmod></url><url><loc>/blog/</loc><lastmod>2021-01-20T19:53:05+03:00</lastmod></url><url><loc>/categories/</loc><lastmod>2021-01-20T19:53:05+03:00</lastmod></url><url><loc>/blog/kafka-to-pubsub-example/</loc><lastmod>2021-01-20T19:53:05+03:00</lastmod></url><url>
[...]
\ No newline at end of file
+<?xml version="1.0" encoding="utf-8" standalone="yes"?><urlset
xmlns="http://www.sitemaps.org/schemas/sitemap/0.9"
xmlns:xhtml="http://www.w3.org/1999/xhtml"><url><loc>/categories/blog/</loc><lastmod>2021-01-20T19:53:05+03:00</lastmod></url><url><loc>/blog/</loc><lastmod>2021-01-20T19:53:05+03:00</lastmod></url><url><loc>/categories/</loc><lastmod>2021-01-20T19:53:05+03:00</lastmod></url><url><loc>/blog/kafka-to-pubsub-example/</loc><lastmod>2021-01-20T19:53:05+03:00</lastmod></url><url>
[...]
\ No newline at end of file