This is an automated email from the ASF dual-hosted git repository.

git-site-role pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/asf-site by this push:
     new ab8dcc0  Publishing website 2021/01/26 06:03:13 at commit dde38b6
ab8dcc0 is described below

commit ab8dcc0cceaa215064010e5e1a77be3c37ee8c7e
Author: jenkins <[email protected]>
AuthorDate: Tue Jan 26 06:03:13 2021 +0000

    Publishing website 2021/01/26 06:03:13 at commit dde38b6
---
 website/generated-content/documentation/index.xml  | 138 ++++++++++++++++++---
 .../documentation/programming-guide/index.html     | 134 +++++++++++++++++---
 website/generated-content/sitemap.xml              |   2 +-
 3 files changed, 239 insertions(+), 35 deletions(-)

diff --git a/website/generated-content/documentation/index.xml 
b/website/generated-content/documentation/index.xml
index 04e9667..4faffd6 100644
--- a/website/generated-content/documentation/index.xml
+++ b/website/generated-content/documentation/index.xml
@@ -6455,6 +6455,7 @@ read and modified inside the DoFn&amp;rsquo;s 
&lt;code>@ProcessElement&lt;/code>
 registered, then Beam will automatically infer the coder for the state value. 
Otherwise, a coder can be explicitly
 specified when creating the ValueState. For example, the following ParDo 
creates a single state variable that
 accumulates the number of elements seen.&lt;/p>
+&lt;p>Note: &lt;code>ValueState&lt;/code> is called 
&lt;code>ReadModifyWriteState&lt;/code> in the Python SDK.&lt;/p>
 &lt;div class=language-java>
 &lt;div class="highlight">&lt;pre class="chroma">&lt;code 
class="language-java" data-lang="java">&lt;span 
class="n">PCollection&lt;/span>&lt;span class="o">&amp;lt;&lt;/span>&lt;span 
class="n">KV&lt;/span>&lt;span class="o">&amp;lt;&lt;/span>&lt;span 
class="n">String&lt;/span>&lt;span class="o">,&lt;/span> &lt;span 
class="n">ValueT&lt;/span>&lt;span class="o">&amp;gt;&amp;gt;&lt;/span> 
&lt;span class="n">perUser&lt;/span> &lt;span class="o">=&lt;/span> &lt;span 
class="n">readPerUser&lt;/ [...]
 &lt;span class="n">perUser&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="na">apply&lt;/span>&lt;span class="o">(&lt;/span>&lt;span 
class="n">ParDo&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="na">of&lt;/span>&lt;span class="o">(&lt;/span>&lt;span 
class="k">new&lt;/span> &lt;span class="n">DoFn&lt;/span>&lt;span 
class="o">&amp;lt;&lt;/span>&lt;span class="n">KV&lt;/span>&lt;span 
class="o">&amp;lt;&lt;/span>&lt;span class="n">String&lt;/span>&lt;span 
class="o">,&lt;/span [...]
@@ -6476,6 +6477,16 @@ accumulates the number of elements seen.&lt;/p>
 &lt;span class="o">...&lt;/span>
 &lt;span class="o">}));&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
 &lt;/div>
+&lt;div class=language-py>
+&lt;div class="highlight">&lt;pre class="chroma">&lt;code class="language-py" 
data-lang="py">&lt;span class="k">class&lt;/span> &lt;span 
class="nc">ReadModifyWriteStateDoFn&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span 
class="p">):&lt;/span>
+&lt;span class="n">STATE_SPEC&lt;/span> &lt;span class="o">=&lt;/span> 
&lt;span class="n">ReadModifyWriteStateSpec&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span 
class="s1">&amp;#39;num_elements&amp;#39;&lt;/span>&lt;span 
class="p">,&lt;/span> &lt;span class="n">VarIntCoder&lt;/span>&lt;span 
class="p">())&lt;/span>
+&lt;span class="k">def&lt;/span> &lt;span class="nf">process&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="bp">self&lt;/span>&lt;span 
class="p">,&lt;/span> &lt;span class="n">element&lt;/span>&lt;span 
class="p">,&lt;/span> &lt;span class="n">state&lt;/span>&lt;span 
class="o">=&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span 
class="o">.&lt;/span>&lt;span class="n">StateParam&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="n">STATE_SPEC&lt;/span>&lt;span 
class="p">)):&lt;/span>
+&lt;span class="c1"># Read the number element seen so far for this user 
key.&lt;/span>
+&lt;span class="n">current_value&lt;/span> &lt;span class="o">=&lt;/span> 
&lt;span class="n">state&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">read&lt;/span>&lt;span class="p">()&lt;/span> &lt;span 
class="ow">or&lt;/span> &lt;span class="mi">0&lt;/span>
+&lt;span class="n">state&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">write&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">current_value&lt;/span>&lt;span class="o">+&lt;/span>&lt;span 
class="mi">1&lt;/span>&lt;span class="p">)&lt;/span>
+&lt;span class="n">_&lt;/span> &lt;span class="o">=&lt;/span> &lt;span 
class="p">(&lt;/span>&lt;span class="n">p&lt;/span> &lt;span 
class="o">|&lt;/span> &lt;span class="s1">&amp;#39;Read per 
user&amp;#39;&lt;/span> &lt;span class="o">&amp;gt;&amp;gt;&lt;/span> &lt;span 
class="n">ReadPerUser&lt;/span>&lt;span class="p">()&lt;/span>
+&lt;span class="o">|&lt;/span> &lt;span class="s1">&amp;#39;state 
pardo&amp;#39;&lt;/span> &lt;span class="o">&amp;gt;&amp;gt;&lt;/span> &lt;span 
class="n">beam&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">ParDo&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">ReadModifyWriteStateDoFn&lt;/span>&lt;span 
class="p">()))&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
+&lt;/div>
 &lt;h4 id="combiningstate">CombiningState&lt;/h4>
 &lt;p>&lt;code>CombiningState&lt;/code> allows you to create a state object 
that is updated using a Beam combiner. For example, the previous
 &lt;code>ValueState&lt;/code> example could be rewritten to use 
&lt;code>CombiningState&lt;/code>&lt;/p>
@@ -6489,10 +6500,10 @@ accumulates the number of elements seen.&lt;/p>
 &lt;span class="o">}&lt;/span>
 &lt;span class="o">}));&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
 &lt;/div>
-&lt;div class=language-python>
-&lt;div class="highlight">&lt;pre class="chroma">&lt;code 
class="language-python" data-lang="python">&lt;span class="k">class&lt;/span> 
&lt;span class="nc">CombiningStateDoFn&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span 
class="p">):&lt;/span>
+&lt;div class=language-py>
+&lt;div class="highlight">&lt;pre class="chroma">&lt;code class="language-py" 
data-lang="py">&lt;span class="k">class&lt;/span> &lt;span 
class="nc">CombiningStateDoFn&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">DoFn&lt;/span>&lt;span class="p">):&lt;/span>
 &lt;span class="n">SUM_TOTAL&lt;/span> &lt;span class="o">=&lt;/span> &lt;span 
class="n">CombiningValueStateSpec&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span 
class="s1">&amp;#39;total&amp;#39;&lt;/span>&lt;span class="p">,&lt;/span> 
&lt;span class="nb">sum&lt;/span>&lt;span class="p">)&lt;/span>
-&lt;span class="k">def&lt;/span> &lt;span class="nf">process&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="bp">self&lt;/span>&lt;span 
class="p">,&lt;/span> &lt;span class="n">element&lt;/span>&lt;span 
class="p">,&lt;/span> &lt;span class="n">state&lt;/span>&lt;span 
class="o">=&lt;/span>&lt;span class="n">SoFn&lt;/span>&lt;span 
class="o">.&lt;/span>&lt;span class="n">StateParam&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="n">SUM_TOTAL&lt;/span>&lt;span 
class="p">)):&lt;/span>
+&lt;span class="k">def&lt;/span> &lt;span class="nf">process&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="bp">self&lt;/span>&lt;span 
class="p">,&lt;/span> &lt;span class="n">element&lt;/span>&lt;span 
class="p">,&lt;/span> &lt;span class="n">state&lt;/span>&lt;span 
class="o">=&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span 
class="o">.&lt;/span>&lt;span class="n">StateParam&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="n">SUM_TOTAL&lt;/span>&lt;span 
class="p">)):&lt;/span>
 &lt;span class="n">state&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">add&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="mi">1&lt;/span>&lt;span class="p">)&lt;/span>
 &lt;span class="n">_&lt;/span> &lt;span class="o">=&lt;/span> &lt;span 
class="p">(&lt;/span>&lt;span class="n">p&lt;/span> &lt;span 
class="o">|&lt;/span> &lt;span class="s1">&amp;#39;Read per 
user&amp;#39;&lt;/span> &lt;span class="o">&amp;gt;&amp;gt;&lt;/span> &lt;span 
class="n">ReadPerUser&lt;/span>&lt;span class="p">()&lt;/span>
 &lt;span class="o">|&lt;/span> &lt;span class="s1">&amp;#39;Combine state 
pardo&amp;#39;&lt;/span> &lt;span class="o">&amp;gt;&amp;gt;&lt;/span> &lt;span 
class="n">beam&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">ParDo&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">CombiningStateDofn&lt;/span>&lt;span 
class="p">()))&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
@@ -6520,8 +6531,8 @@ bags larger than available memory.&lt;/p>
 &lt;span class="o">}&lt;/span>
 &lt;span class="o">}));&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
 &lt;/div>
-&lt;div class=language-python>
-&lt;div class="highlight">&lt;pre class="chroma">&lt;code 
class="language-python" data-lang="python">&lt;span class="k">class&lt;/span> 
&lt;span class="nc">BagStateDoFn&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span 
class="p">):&lt;/span>
+&lt;div class=language-py>
+&lt;div class="highlight">&lt;pre class="chroma">&lt;code class="language-py" 
data-lang="py">&lt;span class="k">class&lt;/span> &lt;span 
class="nc">BagStateDoFn&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">DoFn&lt;/span>&lt;span class="p">):&lt;/span>
 &lt;span class="n">ALL_ELEMENTS&lt;/span> &lt;span class="o">=&lt;/span> 
&lt;span class="n">BagStateSpec&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="s1">&amp;#39;buffer&amp;#39;&lt;/span>&lt;span class="p">,&lt;/span> 
&lt;span class="n">coders&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">VarIntCoder&lt;/span>&lt;span class="p">())&lt;/span>
 &lt;span class="k">def&lt;/span> &lt;span class="nf">process&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="bp">self&lt;/span>&lt;span 
class="p">,&lt;/span> &lt;span class="n">element_pair&lt;/span>&lt;span 
class="p">,&lt;/span> &lt;span class="n">state&lt;/span>&lt;span 
class="o">=&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span 
class="o">.&lt;/span>&lt;span class="n">StateParam&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="n">ALL_ELEMENTS&lt;/span>&lt;span 
class="p" [...]
 &lt;span class="n">state&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">add&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">element_pair&lt;/span>&lt;span class="p">[&lt;/span>&lt;span 
class="mi">1&lt;/span>&lt;span class="p">])&lt;/span>
@@ -6553,6 +6564,9 @@ runner can prefetch all of the states necessary. For 
example:&lt;/p>
 &lt;span class="o">}&lt;/span>
 &lt;span class="o">}));&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
 &lt;/div>
+&lt;div class=language-py>
+&lt;div class="highlight">&lt;pre class="chroma">&lt;code class="language-py" 
data-lang="py">&lt;span class="n">This&lt;/span> &lt;span 
class="ow">is&lt;/span> &lt;span class="ow">not&lt;/span> &lt;span 
class="n">supported&lt;/span> &lt;span class="n">yet&lt;/span>&lt;span 
class="p">,&lt;/span> &lt;span class="n">see&lt;/span> &lt;span 
class="n">BEAM&lt;/span>&lt;span class="o">-&lt;/span>&lt;span 
class="mf">11506.&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
+&lt;/div>
 &lt;p>If however there are code paths in which the states are not fetched, 
then annotating with @AlwaysFetched will add
 unnecessary fetching for those paths. In this case, the readLater method 
allows the runner to know that the state will
 be read in the future, allowing multiple state reads to be batched 
together.&lt;/p>
@@ -6606,8 +6620,8 @@ allows for event-time aggregations.&lt;/p>
 &lt;/span>&lt;span class="c1">&lt;/span> &lt;span class="o">}&lt;/span>
 &lt;span class="o">}));&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
 &lt;/div>
-&lt;div class=language-python>
-&lt;div class="highlight">&lt;pre class="chroma">&lt;code 
class="language-python" data-lang="python">&lt;span class="k">class&lt;/span> 
&lt;span class="nc">EventTimerDoFn&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span 
class="p">):&lt;/span>
+&lt;div class=language-py>
+&lt;div class="highlight">&lt;pre class="chroma">&lt;code class="language-py" 
data-lang="py">&lt;span class="k">class&lt;/span> &lt;span 
class="nc">EventTimerDoFn&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">DoFn&lt;/span>&lt;span class="p">):&lt;/span>
 &lt;span class="n">ALL_ELEMENTS&lt;/span> &lt;span class="o">=&lt;/span> 
&lt;span class="n">BagStateSpec&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="s1">&amp;#39;buffer&amp;#39;&lt;/span>&lt;span class="p">,&lt;/span> 
&lt;span class="n">coders&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">VarIntCoder&lt;/span>&lt;span class="p">())&lt;/span>
 &lt;span class="n">TIMER&lt;/span> &lt;span class="o">=&lt;/span> &lt;span 
class="n">TimerSpec&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="s1">&amp;#39;timer&amp;#39;&lt;/span>&lt;span class="p">,&lt;/span> 
&lt;span class="n">TimeDomain&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">WATERMARK&lt;/span>&lt;span class="p">)&lt;/span>
 &lt;span class="k">def&lt;/span> &lt;span class="nf">process&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="bp">self&lt;/span>&lt;span 
class="p">,&lt;/span>
@@ -6644,8 +6658,8 @@ to the current time. In Java, the 
&lt;code>Timer.offset&lt;/code> and &lt;code>T
 &lt;/span>&lt;span class="c1">&lt;/span> &lt;span class="o">}&lt;/span>
 &lt;span class="o">}));&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
 &lt;/div>
-&lt;div class=language-python>
-&lt;div class="highlight">&lt;pre class="chroma">&lt;code 
class="language-python" data-lang="python">&lt;span class="k">class&lt;/span> 
&lt;span class="nc">ProcessingTimerDoFn&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span 
class="p">):&lt;/span>
+&lt;div class=language-py>
+&lt;div class="highlight">&lt;pre class="chroma">&lt;code class="language-py" 
data-lang="py">&lt;span class="k">class&lt;/span> &lt;span 
class="nc">ProcessingTimerDoFn&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">DoFn&lt;/span>&lt;span class="p">):&lt;/span>
 &lt;span class="n">ALL_ELEMENTS&lt;/span> &lt;span class="o">=&lt;/span> 
&lt;span class="n">BagStateSpec&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="s1">&amp;#39;buffer&amp;#39;&lt;/span>&lt;span class="p">,&lt;/span> 
&lt;span class="n">coders&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">VarIntCoder&lt;/span>&lt;span class="p">())&lt;/span>
 &lt;span class="n">TIMER&lt;/span> &lt;span class="o">=&lt;/span> &lt;span 
class="n">TimerSpec&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="s1">&amp;#39;timer&amp;#39;&lt;/span>&lt;span class="p">,&lt;/span> 
&lt;span class="n">TimeDomain&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">REAL_TIME&lt;/span>&lt;span class="p">)&lt;/span>
 &lt;span class="k">def&lt;/span> &lt;span class="nf">process&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="bp">self&lt;/span>&lt;span 
class="p">,&lt;/span>
@@ -6663,11 +6677,13 @@ to the current time. In Java, the 
&lt;code>Timer.offset&lt;/code> and &lt;code>T
 &lt;span class="o">|&lt;/span> &lt;span class="s1">&amp;#39;ProcessingTime 
timer pardo&amp;#39;&lt;/span> &lt;span class="o">&amp;gt;&amp;gt;&lt;/span> 
&lt;span class="n">beam&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">ParDo&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">ProcessingTimerDoFn&lt;/span>&lt;span 
class="p">()))&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
 &lt;/div>
 &lt;h4 id="dynamic-timer-tags">11.3.3. Dynamic timer tags&lt;/h4>
-&lt;p>Beam also supports dynamically setting a timer tag using 
&lt;code>TimerMap&lt;/code>. This allows for setting multiple different timers
+&lt;p>Beam also supports dynamically setting a timer tag using 
&lt;code>TimerMap&lt;/code> in the Java SDK. This allows for setting multiple 
different timers
 in a &lt;code>DoFn&lt;/code> and allowing for the timer tags to be dynamically 
chosen - e.g. based on data in the input elements. A
 timer with a specific tag can only be set to a single timestamp, so setting 
the timer again has the effect of
 overwriting the previous expiration time for the timer with that tag. Each 
&lt;code>TimerMap&lt;/code> is identified with a timer family
 id, and timers in different timer families are independent.&lt;/p>
+&lt;p>In the Python SDK, a dynamic timer tag can be specified while calling 
&lt;code>set()&lt;/code> or &lt;code>clear()&lt;/code>. By default, the timer
+tag is an empty string if not specified.&lt;/p>
 &lt;div class=language-java>
 &lt;div class="highlight">&lt;pre class="chroma">&lt;code 
class="language-java" data-lang="java">&lt;span 
class="n">PCollection&lt;/span>&lt;span class="o">&amp;lt;&lt;/span>&lt;span 
class="n">KV&lt;/span>&lt;span class="o">&amp;lt;&lt;/span>&lt;span 
class="n">String&lt;/span>&lt;span class="o">,&lt;/span> &lt;span 
class="n">ValueT&lt;/span>&lt;span class="o">&amp;gt;&amp;gt;&lt;/span> 
&lt;span class="n">perUser&lt;/span> &lt;span class="o">=&lt;/span> &lt;span 
class="n">readPerUser&lt;/ [...]
 &lt;span class="n">perUser&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="na">apply&lt;/span>&lt;span class="o">(&lt;/span>&lt;span 
class="n">ParDo&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="na">of&lt;/span>&lt;span class="o">(&lt;/span>&lt;span 
class="k">new&lt;/span> &lt;span class="n">DoFn&lt;/span>&lt;span 
class="o">&amp;lt;&lt;/span>&lt;span class="n">KV&lt;/span>&lt;span 
class="o">&amp;lt;&lt;/span>&lt;span class="n">String&lt;/span>&lt;span 
class="o">,&lt;/span [...]
@@ -6684,8 +6700,28 @@ id, and timers in different timer families are 
independent.&lt;/p>
 &lt;span class="o">}&lt;/span>
 &lt;span class="o">}));&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
 &lt;/div>
-&lt;div class=language-python>
-&lt;div class="highlight">&lt;pre class="chroma">&lt;code 
class="language-python" data-lang="python">&lt;span class="n">To&lt;/span> 
&lt;span class="n">be&lt;/span> &lt;span class="n">supported&lt;/span>&lt;span 
class="p">,&lt;/span> &lt;span class="n">See&lt;/span> &lt;span 
class="n">BEAM&lt;/span>&lt;span class="o">-&lt;/span>&lt;span 
class="mi">9602&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
+&lt;div class=language-py>
+&lt;div class="highlight">&lt;pre class="chroma">&lt;code class="language-py" 
data-lang="py">&lt;span class="k">class&lt;/span> &lt;span 
class="nc">TimerDoFn&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">DoFn&lt;/span>&lt;span class="p">):&lt;/span>
+&lt;span class="n">ALL_ELEMENTS&lt;/span> &lt;span class="o">=&lt;/span> 
&lt;span class="n">BagStateSpec&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="s1">&amp;#39;buffer&amp;#39;&lt;/span>&lt;span class="p">,&lt;/span> 
&lt;span class="n">coders&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">VarIntCoder&lt;/span>&lt;span class="p">())&lt;/span>
+&lt;span class="n">TIMER&lt;/span> &lt;span class="o">=&lt;/span> &lt;span 
class="n">TimerSpec&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="s1">&amp;#39;timer&amp;#39;&lt;/span>&lt;span class="p">,&lt;/span> 
&lt;span class="n">TimeDomain&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">REAL_TIME&lt;/span>&lt;span class="p">)&lt;/span>
+&lt;span class="k">def&lt;/span> &lt;span class="nf">process&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="bp">self&lt;/span>&lt;span 
class="p">,&lt;/span>
+&lt;span class="n">element_pair&lt;/span>&lt;span class="p">,&lt;/span>
+&lt;span class="nb">buffer&lt;/span> &lt;span class="o">=&lt;/span> &lt;span 
class="n">DoFn&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">StateParam&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">ALL_ELEMENTS&lt;/span>&lt;span class="p">),&lt;/span>
+&lt;span class="n">timer&lt;/span> &lt;span class="o">=&lt;/span> &lt;span 
class="n">DoFn&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">TimerParam&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">TIMER&lt;/span>&lt;span class="p">)):&lt;/span>
+&lt;span class="nb">buffer&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">add&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">element_pair&lt;/span>&lt;span class="p">[&lt;/span>&lt;span 
class="mi">1&lt;/span>&lt;span class="p">])&lt;/span>
+&lt;span class="c1"># Set a timer to go off 30 seconds in the future with 
dynamic timer tag &amp;#39;first_timer&amp;#39;.&lt;/span>
+&lt;span class="c1"># And set a timer to go off 60 seconds in the future with 
dynamic timer tag &amp;#39;second_timer&amp;#39;.&lt;/span>
+&lt;span class="n">timer&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">set&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">Timestamp&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">now&lt;/span>&lt;span class="p">()&lt;/span> &lt;span 
class="o">+&lt;/span> &lt;span class="n">Duration&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="n">seconds&lt;/span>&lt;span 
class="o">=&lt;/span>&lt;span class="mi">30&lt;/span>&lt;span 
class="p">),&lt;/span> &lt;sp [...]
+&lt;span class="n">timer&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">set&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">Timestamp&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">now&lt;/span>&lt;span class="p">()&lt;/span> &lt;span 
class="o">+&lt;/span> &lt;span class="n">Duration&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="n">seconds&lt;/span>&lt;span 
class="o">=&lt;/span>&lt;span class="mi">60&lt;/span>&lt;span 
class="p">),&lt;/span> &lt;sp [...]
+&lt;span class="c1"># Note that a timer can also be explicitly cleared if 
previously set with a dynamic timer tag:&lt;/span>
+&lt;span class="c1"># timer.clear(dynamic_timer_tag=...)&lt;/span>
+&lt;span class="nd">@on_timer&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">TIMER&lt;/span>&lt;span class="p">)&lt;/span>
+&lt;span class="k">def&lt;/span> &lt;span 
class="nf">expiry_callback&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="bp">self&lt;/span>&lt;span class="p">,&lt;/span> &lt;span 
class="nb">buffer&lt;/span> &lt;span class="o">=&lt;/span> &lt;span 
class="n">DoFn&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">StateParam&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">ALL_ELEMENTS&lt;/span>&lt;span class="p">),&lt;/span> &lt;span 
class="n">timer_tag&lt;/span>&lt;span [...]
+&lt;span class="c1"># Process timer, the dynamic timer tag associated with 
expiring timer can be read back with DoFn.DynamicTimerTagParam.&lt;/span>
+&lt;span class="nb">buffer&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">clear&lt;/span>&lt;span class="p">()&lt;/span>
+&lt;span class="k">yield&lt;/span> &lt;span class="p">(&lt;/span>&lt;span 
class="n">timer_tag&lt;/span>&lt;span class="p">,&lt;/span> &lt;span 
class="s1">&amp;#39;fired&amp;#39;&lt;/span>&lt;span class="p">)&lt;/span>
+&lt;span class="n">_&lt;/span> &lt;span class="o">=&lt;/span> &lt;span 
class="p">(&lt;/span>&lt;span class="n">p&lt;/span> &lt;span 
class="o">|&lt;/span> &lt;span class="s1">&amp;#39;Read per 
user&amp;#39;&lt;/span> &lt;span class="o">&amp;gt;&amp;gt;&lt;/span> &lt;span 
class="n">ReadPerUser&lt;/span>&lt;span class="p">()&lt;/span>
+&lt;span class="o">|&lt;/span> &lt;span class="s1">&amp;#39;ProcessingTime 
timer pardo&amp;#39;&lt;/span> &lt;span class="o">&amp;gt;&amp;gt;&lt;/span> 
&lt;span class="n">beam&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">ParDo&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">TimerDoFn&lt;/span>&lt;span 
class="p">()))&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
 &lt;/div>
 &lt;h4 id="timer-output-timestamps">11.3.4. Timer output timestamps&lt;/h4>
 &lt;p>By default, event-time timers will hold the output watermark of the 
&lt;code>ParDo&lt;/code> to the timestamp of the timer. This means
@@ -6778,6 +6814,9 @@ past the timestamp of the minimum element. The following 
code demonstrates this.
 &lt;span class="o">}&lt;/span>
 &lt;span class="o">}));&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
 &lt;/div>
+&lt;div class=language-py>
+&lt;div class="highlight">&lt;pre class="chroma">&lt;code class="language-py" 
data-lang="py">&lt;span class="n">Timer&lt;/span> &lt;span 
class="n">output&lt;/span> &lt;span class="n">timestamps&lt;/span> &lt;span 
class="ow">is&lt;/span> &lt;span class="ow">not&lt;/span> &lt;span 
class="n">yet&lt;/span> &lt;span class="n">supported&lt;/span> &lt;span 
class="ow">in&lt;/span> &lt;span class="n">Python&lt;/span> &lt;span 
class="n">SDK&lt;/span>&lt;span class="o">.&lt;/span> &lt;span class="n [...]
+&lt;/div>
 &lt;h3 id="garbage-collecting-state">11.4. Garbage collecting state&lt;/h3>
 &lt;p>Per-key state needs to be garbage collected, or eventually the 
increasing size of state may negatively impact
 performance. There are two common strategies for garbage collecting 
state.&lt;/p>
@@ -6802,8 +6841,8 @@ garbage-collection strategy.&lt;/p>
 &lt;/span>&lt;span class="c1">&lt;/span> &lt;span class="o">}&lt;/span>
 &lt;span class="o">}));&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
 &lt;/div>
-&lt;div class=language-python>
-&lt;div class="highlight">&lt;pre class="chroma">&lt;code 
class="language-python" data-lang="python">&lt;span class="k">class&lt;/span> 
&lt;span class="nc">StateDoFn&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">DoFn&lt;/span>&lt;span class="p">):&lt;/span>
+&lt;div class=language-py>
+&lt;div class="highlight">&lt;pre class="chroma">&lt;code class="language-py" 
data-lang="py">&lt;span class="k">class&lt;/span> &lt;span 
class="nc">StateDoFn&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">DoFn&lt;/span>&lt;span class="p">):&lt;/span>
 &lt;span class="n">ALL_ELEMENTS&lt;/span> &lt;span class="o">=&lt;/span> 
&lt;span class="n">BagStateSpec&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="s1">&amp;#39;buffer&amp;#39;&lt;/span>&lt;span class="p">,&lt;/span> 
&lt;span class="n">coders&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">VarIntCoder&lt;/span>&lt;span class="p">())&lt;/span>
 &lt;span class="k">def&lt;/span> &lt;span class="nf">process&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="bp">self&lt;/span>&lt;span 
class="p">,&lt;/span>
 &lt;span class="n">element_pair&lt;/span>&lt;span class="p">,&lt;/span>
@@ -6851,8 +6890,8 @@ This can be done by updating a timer that garbage 
collects state. For example&lt
 &lt;span class="o">}&lt;/span>
 &lt;span class="o">}&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
 &lt;/div>
-&lt;div class=language-python>
-&lt;div class="highlight">&lt;pre class="chroma">&lt;code 
class="language-python" data-lang="python">&lt;span class="k">class&lt;/span> 
&lt;span class="nc">UserDoFn&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">DoFn&lt;/span>&lt;span class="p">):&lt;/span>
+&lt;div class=language-py>
+&lt;div class="highlight">&lt;pre class="chroma">&lt;code class="language-py" 
data-lang="py">&lt;span class="k">class&lt;/span> &lt;span 
class="nc">UserDoFn&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">DoFn&lt;/span>&lt;span class="p">):&lt;/span>
 &lt;span class="n">ALL_ELEMENTS&lt;/span> &lt;span class="o">=&lt;/span> 
&lt;span class="n">BagStateSpec&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="s1">&amp;#39;state&amp;#39;&lt;/span>&lt;span class="p">,&lt;/span> 
&lt;span class="n">coders&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">VarIntCoder&lt;/span>&lt;span class="p">())&lt;/span>
 &lt;span class="n">MAX_TIMESTAMP&lt;/span> &lt;span class="o">=&lt;/span> 
&lt;span class="n">CombiningValueStateSpec&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span 
class="s1">&amp;#39;max_timestamp_seen&amp;#39;&lt;/span>&lt;span 
class="p">,&lt;/span> &lt;span class="nb">max&lt;/span>&lt;span 
class="p">)&lt;/span>
 &lt;span class="n">TIMER&lt;/span> &lt;span class="o">=&lt;/span> &lt;span 
class="n">TimerSpec&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="s1">&amp;#39;gc-timer&amp;#39;&lt;/span>&lt;span class="p">,&lt;/span> 
&lt;span class="n">TimeDomain&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">WATERMARK&lt;/span>&lt;span class="p">)&lt;/span>
@@ -6896,7 +6935,7 @@ event before the view event. The one hour join timeout 
should be based on event
 &lt;/span>&lt;span class="c1">&lt;/span>&lt;span 
class="n">PCollection&lt;/span>&lt;span class="o">&amp;lt;&lt;/span>&lt;span 
class="n">KV&lt;/span>&lt;span class="o">&amp;lt;&lt;/span>&lt;span 
class="n">String&lt;/span>&lt;span class="o">,&lt;/span> &lt;span 
class="n">Event&lt;/span>&lt;span class="o">&amp;gt;&amp;gt;&lt;/span> &lt;span 
class="n">eventsPerLinkId&lt;/span> &lt;span class="o">=&lt;/span>
 &lt;span class="n">readEvents&lt;/span>&lt;span class="o">()&lt;/span>
 &lt;span class="o">.&lt;/span>&lt;span class="na">apply&lt;/span>&lt;span 
class="o">(&lt;/span>&lt;span class="n">WithKeys&lt;/span>&lt;span 
class="o">.&lt;/span>&lt;span class="na">of&lt;/span>&lt;span 
class="o">(&lt;/span>&lt;span class="n">Event&lt;/span>&lt;span 
class="o">::&lt;/span>&lt;span class="n">getLinkId&lt;/span>&lt;span 
class="o">).&lt;/span>&lt;span class="na">withKeyType&lt;/span>&lt;span 
class="o">(&lt;/span>&lt;span class="n">TypeDescriptors&lt;/span>&lt;span 
class="o"> [...]
-&lt;span class="n">perUser&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="na">apply&lt;/span>&lt;span class="o">(&lt;/span>&lt;span 
class="n">ParDo&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="na">of&lt;/span>&lt;span class="o">(&lt;/span>&lt;span 
class="k">new&lt;/span> &lt;span class="n">DoFn&lt;/span>&lt;span 
class="o">&amp;lt;&lt;/span>&lt;span class="n">KV&lt;/span>&lt;span 
class="o">&amp;lt;&lt;/span>&lt;span class="n">String&lt;/span>&lt;span 
class="o">,&lt;/span [...]
+&lt;span class="n">eventsPerLinkId&lt;/span>&lt;span 
class="o">.&lt;/span>&lt;span class="na">apply&lt;/span>&lt;span 
class="o">(&lt;/span>&lt;span class="n">ParDo&lt;/span>&lt;span 
class="o">.&lt;/span>&lt;span class="na">of&lt;/span>&lt;span 
class="o">(&lt;/span>&lt;span class="k">new&lt;/span> &lt;span 
class="n">DoFn&lt;/span>&lt;span class="o">&amp;lt;&lt;/span>&lt;span 
class="n">KV&lt;/span>&lt;span class="o">&amp;lt;&lt;/span>&lt;span 
class="n">String&lt;/span>&lt;span class="o">,& [...]
 &lt;span class="c1">// Store the view event.
 &lt;/span>&lt;span class="c1">&lt;/span> &lt;span 
class="nd">@StateId&lt;/span>&lt;span class="o">(&lt;/span>&lt;span 
class="s">&amp;#34;view&amp;#34;&lt;/span>&lt;span class="o">)&lt;/span> 
&lt;span class="kd">private&lt;/span> &lt;span class="kd">final&lt;/span> 
&lt;span class="n">StateSpec&lt;/span>&lt;span 
class="o">&amp;lt;&lt;/span>&lt;span class="n">ValueState&lt;/span>&lt;span 
class="o">&amp;lt;&lt;/span>&lt;span class="n">Event&lt;/span>&lt;span 
class="o">&amp;gt;&amp;gt;&lt;/sp [...]
 &lt;span class="c1">// Store the click event.
@@ -6951,6 +6990,50 @@ event before the view event. The one hour join timeout 
should be based on event
 &lt;span class="o">}&lt;/span>
 &lt;span class="o">}));&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
 &lt;/div>
+&lt;div class=language-py>
+&lt;div class="highlight">&lt;pre class="chroma">&lt;code class="language-py" 
data-lang="py">&lt;span class="k">class&lt;/span> &lt;span 
class="nc">JoinDoFn&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">DoFn&lt;/span>&lt;span class="p">):&lt;/span>
+&lt;span class="c1"># stores the view event.&lt;/span>
+&lt;span class="n">VIEW_STATE_SPEC&lt;/span> &lt;span class="o">=&lt;/span> 
&lt;span class="n">ReadModifyWriteStateSpec&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span 
class="s1">&amp;#39;view&amp;#39;&lt;/span>&lt;span class="p">,&lt;/span> 
&lt;span class="n">EventCoder&lt;/span>&lt;span class="p">())&lt;/span>
+&lt;span class="c1"># stores the click event.&lt;/span>
+&lt;span class="n">CLICK_STATE_SPEC&lt;/span> &lt;span class="o">=&lt;/span> 
&lt;span class="n">ReadModifyWriteStateSpec&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span 
class="s1">&amp;#39;click&amp;#39;&lt;/span>&lt;span class="p">,&lt;/span> 
&lt;span class="n">EventCoder&lt;/span>&lt;span class="p">())&lt;/span>
+&lt;span class="c1"># The maximum element timestamp value seen so 
far.&lt;/span>
+&lt;span class="n">MAX_TIMESTAMP&lt;/span> &lt;span class="o">=&lt;/span> 
&lt;span class="n">CombiningValueStateSpec&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span 
class="s1">&amp;#39;max_timestamp_seen&amp;#39;&lt;/span>&lt;span 
class="p">,&lt;/span> &lt;span class="nb">max&lt;/span>&lt;span 
class="p">)&lt;/span>
+&lt;span class="c1"># Timer that fires when an hour goes by with an incomplete 
join.&lt;/span>
+&lt;span class="n">GC_TIMER&lt;/span> &lt;span class="o">=&lt;/span> &lt;span 
class="n">TimerSpec&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="s1">&amp;#39;gc&amp;#39;&lt;/span>&lt;span class="p">,&lt;/span> 
&lt;span class="n">TimeDomain&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">WATERMARK&lt;/span>&lt;span class="p">)&lt;/span>
+&lt;span class="k">def&lt;/span> &lt;span class="nf">process&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="bp">self&lt;/span>&lt;span 
class="p">,&lt;/span>
+&lt;span class="n">element&lt;/span>&lt;span class="p">,&lt;/span>
+&lt;span class="n">view&lt;/span>&lt;span class="o">=&lt;/span>&lt;span 
class="n">DoFn&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">StateParam&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">VIEW_STATE_SPEC&lt;/span>&lt;span class="p">),&lt;/span>
+&lt;span class="n">click&lt;/span>&lt;span class="o">=&lt;/span>&lt;span 
class="n">DoFn&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">StateParam&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">CLICK_STATE_SPEC&lt;/span>&lt;span class="p">),&lt;/span>
+&lt;span class="n">max_timestamp_seen&lt;/span>&lt;span 
class="o">=&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span 
class="o">.&lt;/span>&lt;span class="n">StateParam&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="n">MAX_TIMESTAMP&lt;/span>&lt;span 
class="p">),&lt;/span>
+&lt;span class="n">ts&lt;/span>&lt;span class="o">=&lt;/span>&lt;span 
class="n">DoFn&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">TimestampParam&lt;/span>&lt;span class="p">,&lt;/span>
+&lt;span class="n">gc&lt;/span>&lt;span class="o">=&lt;/span>&lt;span 
class="n">DoFn&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">TimerParam&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">GC_TIMER&lt;/span>&lt;span class="p">)):&lt;/span>
+&lt;span class="n">event&lt;/span> &lt;span class="o">=&lt;/span> &lt;span 
class="n">element&lt;/span>
+&lt;span class="k">if&lt;/span> &lt;span class="n">event&lt;/span>&lt;span 
class="o">.&lt;/span>&lt;span class="n">type&lt;/span> &lt;span 
class="o">==&lt;/span> &lt;span 
class="s1">&amp;#39;view&amp;#39;&lt;/span>&lt;span class="p">:&lt;/span>
+&lt;span class="n">view&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">write&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">event&lt;/span>&lt;span class="p">)&lt;/span>
+&lt;span class="k">else&lt;/span>&lt;span class="p">:&lt;/span>
+&lt;span class="n">click&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">write&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">event&lt;/span>&lt;span class="p">)&lt;/span>
+&lt;span class="n">previous_view&lt;/span> &lt;span class="o">=&lt;/span> 
&lt;span class="n">view&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">read&lt;/span>&lt;span class="p">()&lt;/span>
+&lt;span class="n">previous_click&lt;/span> &lt;span class="o">=&lt;/span> 
&lt;span class="n">click&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">read&lt;/span>&lt;span class="p">()&lt;/span>
+&lt;span class="c1"># We&amp;#39;ve seen both a view and a click. Output a 
joined event and clear state.&lt;/span>
+&lt;span class="k">if&lt;/span> &lt;span class="n">previous_view&lt;/span> 
&lt;span class="ow">and&lt;/span> &lt;span 
class="n">previous_click&lt;/span>&lt;span class="p">:&lt;/span>
+&lt;span class="k">yield&lt;/span> &lt;span class="p">(&lt;/span>&lt;span 
class="n">previous_view&lt;/span>&lt;span class="p">,&lt;/span> &lt;span 
class="n">previous_click&lt;/span>&lt;span class="p">)&lt;/span>
+&lt;span class="n">view&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">clear&lt;/span>&lt;span class="p">()&lt;/span>
+&lt;span class="n">click&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">clear&lt;/span>&lt;span class="p">()&lt;/span>
+&lt;span class="n">max_timestamp_seen&lt;/span>&lt;span 
class="o">.&lt;/span>&lt;span class="n">clear&lt;/span>&lt;span 
class="p">()&lt;/span>
+&lt;span class="k">else&lt;/span>&lt;span class="p">:&lt;/span>
+&lt;span class="n">max_timestamp_seen&lt;/span>&lt;span 
class="o">.&lt;/span>&lt;span class="n">add&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="n">ts&lt;/span>&lt;span 
class="p">)&lt;/span>
+&lt;span class="n">gc&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">set&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">max_timestamp_seen&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">read&lt;/span>&lt;span class="p">()&lt;/span> &lt;span 
class="o">+&lt;/span> &lt;span class="n">Duration&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="n">seconds&lt;/span>&lt;span 
class="o">=&lt;/span>&lt;span class="mi">3600&lt;/span>&lt;span 
class="p">))&lt;/span>
+&lt;span class="nd">@on_timer&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">GC_TIMER&lt;/span>&lt;span class="p">)&lt;/span>
+&lt;span class="k">def&lt;/span> &lt;span 
class="nf">gc_callback&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="bp">self&lt;/span>&lt;span class="p">,&lt;/span>
+&lt;span class="n">view&lt;/span>&lt;span class="o">=&lt;/span>&lt;span 
class="n">DoFn&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">StateParam&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">VIEW_STATE_SPEC&lt;/span>&lt;span class="p">),&lt;/span>
+&lt;span class="n">click&lt;/span>&lt;span class="o">=&lt;/span>&lt;span 
class="n">DoFn&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">StateParam&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">CLICK_STATE_SPEC&lt;/span>&lt;span class="p">),&lt;/span>
+&lt;span class="n">max_timestamp_seen&lt;/span>&lt;span 
class="o">=&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span 
class="o">.&lt;/span>&lt;span class="n">StateParam&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="n">MAX_TIMESTAMP&lt;/span>&lt;span 
class="p">)):&lt;/span>
+&lt;span class="n">view&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">clear&lt;/span>&lt;span class="p">()&lt;/span>
+&lt;span class="n">click&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">clear&lt;/span>&lt;span class="p">()&lt;/span>
+&lt;span class="n">max_timestamp_seen&lt;/span>&lt;span 
class="o">.&lt;/span>&lt;span class="n">clear&lt;/span>&lt;span 
class="p">()&lt;/span>
+&lt;span class="n">_&lt;/span> &lt;span class="o">=&lt;/span> &lt;span 
class="p">(&lt;/span>&lt;span class="n">p&lt;/span> &lt;span 
class="o">|&lt;/span> &lt;span 
class="s1">&amp;#39;EventsPerLinkId&amp;#39;&lt;/span> &lt;span 
class="o">&amp;gt;&amp;gt;&lt;/span> &lt;span 
class="n">ReadPerLinkEvents&lt;/span>&lt;span class="p">()&lt;/span>
+&lt;span class="o">|&lt;/span> &lt;span class="s1">&amp;#39;Join 
DoFn&amp;#39;&lt;/span> &lt;span class="o">&amp;gt;&amp;gt;&lt;/span> &lt;span 
class="n">beam&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">ParDo&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">JoinDoFn&lt;/span>&lt;span 
class="p">()))&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
+&lt;/div>
 &lt;h4 id="batching-rpcs">11.5.2. Batching RPCs&lt;/h4>
 &lt;p>In this example, input elements are being forwarded to an external RPC 
service. The RPC accepts batch requests -
 multiple events for the same user can be batched in a single RPC call. Since 
this RPC service also imposes rate limits,
@@ -6987,6 +7070,27 @@ we want to batch ten seconds worth of events together in 
order to reduce the num
 &lt;span class="o">}&lt;/span>
 &lt;span class="o">}));&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
 &lt;/div>
+&lt;div class=language-py>
+&lt;div class="highlight">&lt;pre class="chroma">&lt;code class="language-py" 
data-lang="py">&lt;span class="k">class&lt;/span> &lt;span 
class="nc">BufferDoFn&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">DoFn&lt;/span>&lt;span class="p">):&lt;/span>
+&lt;span class="n">BUFFER&lt;/span> &lt;span class="o">=&lt;/span> &lt;span 
class="n">BagStateSpec&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="s1">&amp;#39;buffer&amp;#39;&lt;/span>&lt;span class="p">,&lt;/span> 
&lt;span class="n">EventCoder&lt;/span>&lt;span class="p">())&lt;/span>
+&lt;span class="n">IS_TIMER_SET&lt;/span> &lt;span class="o">=&lt;/span> 
&lt;span class="n">ReadModifyWriteStateSpec&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span 
class="s1">&amp;#39;is_timer_set&amp;#39;&lt;/span>&lt;span 
class="p">,&lt;/span> &lt;span class="n">BooleanCoder&lt;/span>&lt;span 
class="p">())&lt;/span>
+&lt;span class="n">OUTPUT&lt;/span> &lt;span class="o">=&lt;/span> &lt;span 
class="n">TimerSpec&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="s1">&amp;#39;output&amp;#39;&lt;/span>&lt;span class="p">,&lt;/span> 
&lt;span class="n">TimeDomain&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">REAL_TIME&lt;/span>&lt;span class="p">)&lt;/span>
+&lt;span class="k">def&lt;/span> &lt;span class="nf">process&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="bp">self&lt;/span>&lt;span 
class="p">,&lt;/span>
+&lt;span class="nb">buffer&lt;/span>&lt;span class="o">=&lt;/span>&lt;span 
class="n">DoFn&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">StateParam&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">BUFFER&lt;/span>&lt;span class="p">),&lt;/span>
+&lt;span class="n">is_timer_set&lt;/span>&lt;span 
class="o">=&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span 
class="o">.&lt;/span>&lt;span class="n">StateParam&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="n">IS_TIMER_SET&lt;/span>&lt;span 
class="p">),&lt;/span>
+&lt;span class="n">timer&lt;/span>&lt;span class="o">=&lt;/span>&lt;span 
class="n">DoFn&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">TimerParam&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">OUTPUT&lt;/span>&lt;span class="p">)):&lt;/span>
+&lt;span class="nb">buffer&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">add&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">element&lt;/span>&lt;span class="p">)&lt;/span>
+&lt;span class="k">if&lt;/span> &lt;span class="ow">not&lt;/span> &lt;span 
class="n">is_timer_set&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">read&lt;/span>&lt;span class="p">():&lt;/span>
+&lt;span class="n">timer&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">set&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">Timestamp&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">now&lt;/span>&lt;span class="p">()&lt;/span> &lt;span 
class="o">+&lt;/span> &lt;span class="n">Duration&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="n">seconds&lt;/span>&lt;span 
class="o">=&lt;/span>&lt;span class="mi">10&lt;/span>&lt;span 
class="p">))&lt;/span>
+&lt;span class="n">is_timer_set&lt;/span>&lt;span 
class="o">.&lt;/span>&lt;span class="n">write&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="bp">True&lt;/span>&lt;span 
class="p">)&lt;/span>
+&lt;span class="nd">@on_timer&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">OUTPUT&lt;/span>&lt;span class="p">)&lt;/span>
+&lt;span class="k">def&lt;/span> &lt;span 
class="nf">output_callback&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="bp">self&lt;/span>&lt;span class="p">,&lt;/span>
+&lt;span class="nb">buffer&lt;/span>&lt;span class="o">=&lt;/span>&lt;span 
class="n">DoFn&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">StateParam&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">BUFFER&lt;/span>&lt;span class="p">),&lt;/span>
+&lt;span class="n">is_timer_set&lt;/span>&lt;span 
class="o">=&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span 
class="o">.&lt;/span>&lt;span class="n">StateParam&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="n">IS_TIMER_SET&lt;/span>&lt;span 
class="p">)):&lt;/span>
+&lt;span class="n">send_rpc&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="nb">list&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="nb">buffer&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">read&lt;/span>&lt;span class="p">()))&lt;/span>
+&lt;span class="nb">buffer&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">clear&lt;/span>&lt;span class="p">()&lt;/span>
+&lt;span class="n">is_timer_set&lt;/span>&lt;span 
class="o">.&lt;/span>&lt;span class="n">clear&lt;/span>&lt;span 
class="p">()&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
+&lt;/div>
 &lt;h2 id="splittable-dofns">12. Splittable &lt;code>DoFns&lt;/code>&lt;/h2>
 &lt;p>A Splittable &lt;code>DoFn&lt;/code> (SDF) enables users to create 
modular components containing I/Os (and some advanced
 &lt;a href="https://s.apache.org/splittable-do-fn#heading=h.5cep9s8k4fxv";>non 
I/O use cases&lt;/a>). Having modular
diff --git 
a/website/generated-content/documentation/programming-guide/index.html 
b/website/generated-content/documentation/programming-guide/index.html
index 637506d..b1d044c 100644
--- a/website/generated-content/documentation/programming-guide/index.html
+++ b/website/generated-content/documentation/programming-guide/index.html
@@ -2280,7 +2280,7 @@ to other nodes in the graph. A <code>DoFn</code> can 
declare multiple state vari
 read and modified inside the DoFn&rsquo;s <code>@ProcessElement</code> or 
<code>@OnTimer</code> methods. If the type of the ValueState has a coder
 registered, then Beam will automatically infer the coder for the state value. 
Otherwise, a coder can be explicitly
 specified when creating the ValueState. For example, the following ParDo 
creates a single state variable that
-accumulates the number of elements seen.</p><div class=language-java><div 
class=highlight><pre class=chroma><code class=language-java 
data-lang=java><span class=n>PCollection</span><span class=o>&lt;</span><span 
class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span 
class=o>,</span> <span class=n>ValueT</span><span class=o>&gt;&gt;</span> <span 
class=n>perUser</span> <span class=o>=</span> <span 
class=n>readPerUser</span><span class=o>();</span>
+accumulates the number of elements seen.</p><p>Note: <code>ValueState</code> 
is called <code>ReadModifyWriteState</code> in the Python SDK.</p><div 
class=language-java><div class=highlight><pre class=chroma><code 
class=language-java data-lang=java><span class=n>PCollection</span><span 
class=o>&lt;</span><span class=n>KV</span><span class=o>&lt;</span><span 
class=n>String</span><span class=o>,</span> <span class=n>ValueT</span><span 
class=o>&gt;&gt;</span> <span class=n>perUser</span> <sp [...]
 <span class=n>perUser</span><span class=o>.</span><span 
class=na>apply</span><span class=o>(</span><span class=n>ParDo</span><span 
class=o>.</span><span class=na>of</span><span class=o>(</span><span 
class=k>new</span> <span class=n>DoFn</span><span class=o>&lt;</span><span 
class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span 
class=o>,</span> <span class=n>ValueT</span><span class=o>&gt;,</span> <span 
class=n>OutputT</span><span class=o>&gt;()</span> <span class=o>{</span>
   <span class=nd>@StateId</span><span class=o>(</span><span 
class=s>&#34;state&#34;</span><span class=o>)</span> <span 
class=kd>private</span> <span class=kd>final</span> <span 
class=n>StateSpec</span><span class=o>&lt;</span><span 
class=n>ValueState</span><span class=o>&lt;</span><span 
class=n>Integer</span><span class=o>&gt;&gt;</span> <span 
class=n>numElements</span> <span class=o>=</span> <span 
class=n>StateSpecs</span><span class=o>.</span><span class=na>value</span><span 
class=o>() [...]
 
@@ -2295,7 +2295,16 @@ accumulates the number of elements seen.</p><div 
class=language-java><div class=
 <span class=n>perUser</span><span class=o>.</span><span 
class=na>apply</span><span class=o>(</span><span class=n>ParDo</span><span 
class=o>.</span><span class=na>of</span><span class=o>(</span><span 
class=k>new</span> <span class=n>DoFn</span><span class=o>&lt;</span><span 
class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span 
class=o>,</span> <span class=n>ValueT</span><span class=o>&gt;,</span> <span 
class=n>OutputT</span><span class=o>&gt;()</span> <span class=o>{</span>
   <span class=nd>@StateId</span><span class=o>(</span><span 
class=s>&#34;state&#34;</span><span class=o>)</span> <span 
class=kd>private</span> <span class=kd>final</span> <span 
class=n>StateSpec</span><span class=o>&lt;</span><span 
class=n>ValueState</span><span class=o>&lt;</span><span 
class=n>MyType</span><span class=o>&gt;&gt;</span> <span 
class=n>numElements</span> <span class=o>=</span> <span 
class=n>StateSpecs</span><span class=o>.</span><span class=na>value</span><span 
class=o>(</ [...]
                  <span class=o>...</span>
-<span class=o>}));</span></code></pre></div></div><h4 
id=combiningstate>CombiningState</h4><p><code>CombiningState</code> allows you 
to create a state object that is updated using a Beam combiner. For example, 
the previous
+<span class=o>}));</span></code></pre></div></div><div class=language-py><div 
class=highlight><pre class=chroma><code class=language-py data-lang=py><span 
class=k>class</span> <span class=nc>ReadModifyWriteStateDoFn</span><span 
class=p>(</span><span class=n>DoFn</span><span class=p>):</span>
+  <span class=n>STATE_SPEC</span> <span class=o>=</span> <span 
class=n>ReadModifyWriteStateSpec</span><span class=p>(</span><span 
class=s1>&#39;num_elements&#39;</span><span class=p>,</span> <span 
class=n>VarIntCoder</span><span class=p>())</span>
+
+  <span class=k>def</span> <span class=nf>process</span><span 
class=p>(</span><span class=bp>self</span><span class=p>,</span> <span 
class=n>element</span><span class=p>,</span> <span class=n>state</span><span 
class=o>=</span><span class=n>DoFn</span><span class=o>.</span><span 
class=n>StateParam</span><span class=p>(</span><span 
class=n>STATE_SPEC</span><span class=p>)):</span>
+    <span class=c1># Read the number element seen so far for this user 
key.</span>
+    <span class=n>current_value</span> <span class=o>=</span> <span 
class=n>state</span><span class=o>.</span><span class=n>read</span><span 
class=p>()</span> <span class=ow>or</span> <span class=mi>0</span>
+    <span class=n>state</span><span class=o>.</span><span 
class=n>write</span><span class=p>(</span><span 
class=n>current_value</span><span class=o>+</span><span class=mi>1</span><span 
class=p>)</span>
+
+<span class=n>_</span> <span class=o>=</span> <span class=p>(</span><span 
class=n>p</span> <span class=o>|</span> <span class=s1>&#39;Read per 
user&#39;</span> <span class=o>&gt;&gt;</span> <span 
class=n>ReadPerUser</span><span class=p>()</span>
+       <span class=o>|</span> <span class=s1>&#39;state pardo&#39;</span> 
<span class=o>&gt;&gt;</span> <span class=n>beam</span><span 
class=o>.</span><span class=n>ParDo</span><span class=p>(</span><span 
class=n>ReadModifyWriteStateDoFn</span><span 
class=p>()))</span></code></pre></div></div><h4 
id=combiningstate>CombiningState</h4><p><code>CombiningState</code> allows you 
to create a state object that is updated using a Beam combiner. For example, 
the previous
 <code>ValueState</code> example could be rewritten to use 
<code>CombiningState</code></p><div class=language-java><div 
class=highlight><pre class=chroma><code class=language-java 
data-lang=java><span class=n>PCollection</span><span class=o>&lt;</span><span 
class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span 
class=o>,</span> <span class=n>ValueT</span><span class=o>&gt;&gt;</span> <span 
class=n>perUser</span> <span class=o>=</span> <span 
class=n>readPerUser</span><s [...]
 <span class=n>perUser</span><span class=o>.</span><span 
class=na>apply</span><span class=o>(</span><span class=n>ParDo</span><span 
class=o>.</span><span class=na>of</span><span class=o>(</span><span 
class=k>new</span> <span class=n>DoFn</span><span class=o>&lt;</span><span 
class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span 
class=o>,</span> <span class=n>ValueT</span><span class=o>&gt;,</span> <span 
class=n>OutputT</span><span class=o>&gt;()</span> <span class=o>{</span>
   <span class=nd>@StateId</span><span class=o>(</span><span 
class=s>&#34;state&#34;</span><span class=o>)</span> <span 
class=kd>private</span> <span class=kd>final</span> <span 
class=n>StateSpec</span><span class=o>&lt;</span><span 
class=n>CombiningState</span><span class=o>&lt;</span><span 
class=n>Integer</span><span class=o>,</span> <span class=kt>int</span><span 
class=o>[],</span> <span class=n>Integer</span><span class=o>&gt;&gt;</span> 
<span class=n>numElements</span> <span class=o> [...]
@@ -2304,10 +2313,10 @@ accumulates the number of elements seen.</p><div 
class=language-java><div class=
   <span class=nd>@ProcessElement</span> <span class=kd>public</span> <span 
class=kt>void</span> <span class=nf>process</span><span class=o>(</span><span 
class=nd>@StateId</span><span class=o>(</span><span 
class=s>&#34;state&#34;</span><span class=o>)</span> <span 
class=n>ValueState</span><span class=o>&lt;</span><span 
class=n>Integer</span><span class=o>&gt;</span> <span class=n>state</span><span 
class=o>)</span> <span class=o>{</span>
     <span class=n>state</span><span class=o>.</span><span 
class=na>add</span><span class=o>(</span><span class=n>1</span><span 
class=o>);</span>
   <span class=o>}</span>
-<span class=o>}));</span></code></pre></div></div><div 
class=language-python><div class=highlight><pre class=chroma><code 
class=language-python data-lang=python><span class=k>class</span> <span 
class=nc>CombiningStateDoFn</span><span class=p>(</span><span 
class=n>DoFn</span><span class=p>):</span>
+<span class=o>}));</span></code></pre></div></div><div class=language-py><div 
class=highlight><pre class=chroma><code class=language-py data-lang=py><span 
class=k>class</span> <span class=nc>CombiningStateDoFn</span><span 
class=p>(</span><span class=n>DoFn</span><span class=p>):</span>
   <span class=n>SUM_TOTAL</span> <span class=o>=</span> <span 
class=n>CombiningValueStateSpec</span><span class=p>(</span><span 
class=s1>&#39;total&#39;</span><span class=p>,</span> <span 
class=nb>sum</span><span class=p>)</span>
 
-  <span class=k>def</span> <span class=nf>process</span><span 
class=p>(</span><span class=bp>self</span><span class=p>,</span> <span 
class=n>element</span><span class=p>,</span> <span class=n>state</span><span 
class=o>=</span><span class=n>SoFn</span><span class=o>.</span><span 
class=n>StateParam</span><span class=p>(</span><span 
class=n>SUM_TOTAL</span><span class=p>)):</span>
+  <span class=k>def</span> <span class=nf>process</span><span 
class=p>(</span><span class=bp>self</span><span class=p>,</span> <span 
class=n>element</span><span class=p>,</span> <span class=n>state</span><span 
class=o>=</span><span class=n>DoFn</span><span class=o>.</span><span 
class=n>StateParam</span><span class=p>(</span><span 
class=n>SUM_TOTAL</span><span class=p>)):</span>
     <span class=n>state</span><span class=o>.</span><span 
class=n>add</span><span class=p>(</span><span class=mi>1</span><span 
class=p>)</span>
 
 <span class=n>_</span> <span class=o>=</span> <span class=p>(</span><span 
class=n>p</span> <span class=o>|</span> <span class=s1>&#39;Read per 
user&#39;</span> <span class=o>&gt;&gt;</span> <span 
class=n>ReadPerUser</span><span class=p>()</span>
@@ -2330,7 +2339,7 @@ bags larger than available memory.</p><div 
class=language-java><div class=highli
       <span class=n>state</span><span class=o>.</span><span 
class=na>clear</span><span class=o>();</span>  <span class=c1>// Clear the 
state for this key.
 </span><span class=c1></span>    <span class=o>}</span>
   <span class=o>}</span>
-<span class=o>}));</span></code></pre></div></div><div 
class=language-python><div class=highlight><pre class=chroma><code 
class=language-python data-lang=python><span class=k>class</span> <span 
class=nc>BagStateDoFn</span><span class=p>(</span><span 
class=n>DoFn</span><span class=p>):</span>
+<span class=o>}));</span></code></pre></div></div><div class=language-py><div 
class=highlight><pre class=chroma><code class=language-py data-lang=py><span 
class=k>class</span> <span class=nc>BagStateDoFn</span><span 
class=p>(</span><span class=n>DoFn</span><span class=p>):</span>
   <span class=n>ALL_ELEMENTS</span> <span class=o>=</span> <span 
class=n>BagStateSpec</span><span class=p>(</span><span 
class=s1>&#39;buffer&#39;</span><span class=p>,</span> <span 
class=n>coders</span><span class=o>.</span><span 
class=n>VarIntCoder</span><span class=p>())</span>
 
   <span class=k>def</span> <span class=nf>process</span><span 
class=p>(</span><span class=bp>self</span><span class=p>,</span> <span 
class=n>element_pair</span><span class=p>,</span> <span 
class=n>state</span><span class=o>=</span><span class=n>DoFn</span><span 
class=o>.</span><span class=n>StateParam</span><span class=p>(</span><span 
class=n>ALL_ELEMENTS</span><span class=p>)):</span>
@@ -2358,7 +2367,7 @@ runner can prefetch all of the states necessary. For 
example:</p><div class=lang
     <span class=n>state2</span><span class=o>.</span><span 
class=na>read</span><span class=o>();</span>
     <span class=n>state3</span><span class=o>.</span><span 
class=na>read</span><span class=o>();</span>
   <span class=o>}</span>
-<span class=o>}));</span></code></pre></div></div><p>If however there are code 
paths in which the states are not fetched, then annotating with @AlwaysFetched 
will add
+<span class=o>}));</span></code></pre></div></div><div class=language-py><div 
class=highlight><pre class=chroma><code class=language-py data-lang=py><span 
class=n>This</span> <span class=ow>is</span> <span class=ow>not</span> <span 
class=n>supported</span> <span class=n>yet</span><span class=p>,</span> <span 
class=n>see</span> <span class=n>BEAM</span><span class=o>-</span><span 
class=mf>11506.</span></code></pre></div></div><p>If however there are code 
paths in which the states are not  [...]
 unnecessary fetching for those paths. In this case, the readLater method 
allows the runner to know that the state will
 be read in the future, allowing multiple state reads to be batched 
together.</p><div class=language-java><div class=highlight><pre 
class=chroma><code class=language-java data-lang=java><span 
class=n>PCollection</span><span class=o>&lt;</span><span class=n>KV</span><span 
class=o>&lt;</span><span class=n>String</span><span class=o>,</span> <span 
class=n>ValueT</span><span class=o>&gt;&gt;</span> <span class=n>perUser</span> 
<span class=o>=</span> <span class=n>readPerUser</span><span class [...]
 <span class=n>perUser</span><span class=o>.</span><span 
class=na>apply</span><span class=o>(</span><span class=n>ParDo</span><span 
class=o>.</span><span class=na>of</span><span class=o>(</span><span 
class=k>new</span> <span class=n>DoFn</span><span class=o>&lt;</span><span 
class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span 
class=o>,</span> <span class=n>ValueT</span><span class=o>&gt;,</span> <span 
class=n>OutputT</span><span class=o>&gt;()</span> <span class=o>{</span>
@@ -2404,7 +2413,7 @@ allows for event-time aggregations.</p><div 
class=language-java><div class=highl
    <span class=nd>@OnTimer</span><span class=o>(</span><span 
class=s>&#34;timer&#34;</span><span class=o>)</span> <span 
class=kd>public</span> <span class=kt>void</span> <span 
class=nf>onTimer</span><span class=o>()</span> <span class=o>{</span>
       <span class=c1>//Process timer.
 </span><span class=c1></span>   <span class=o>}</span>
-<span class=o>}));</span></code></pre></div></div><div 
class=language-python><div class=highlight><pre class=chroma><code 
class=language-python data-lang=python><span class=k>class</span> <span 
class=nc>EventTimerDoFn</span><span class=p>(</span><span 
class=n>DoFn</span><span class=p>):</span>
+<span class=o>}));</span></code></pre></div></div><div class=language-py><div 
class=highlight><pre class=chroma><code class=language-py data-lang=py><span 
class=k>class</span> <span class=nc>EventTimerDoFn</span><span 
class=p>(</span><span class=n>DoFn</span><span class=p>):</span>
   <span class=n>ALL_ELEMENTS</span> <span class=o>=</span> <span 
class=n>BagStateSpec</span><span class=p>(</span><span 
class=s1>&#39;buffer&#39;</span><span class=p>,</span> <span 
class=n>coders</span><span class=o>.</span><span 
class=n>VarIntCoder</span><span class=p>())</span>
   <span class=n>TIMER</span> <span class=o>=</span> <span 
class=n>TimerSpec</span><span class=p>(</span><span 
class=s1>&#39;timer&#39;</span><span class=p>,</span> <span 
class=n>TimeDomain</span><span class=o>.</span><span 
class=n>WATERMARK</span><span class=p>)</span>
 
@@ -2438,7 +2447,7 @@ to the current time. In Java, the 
<code>Timer.offset</code> and <code>Timer.setR
    <span class=nd>@OnTimer</span><span class=o>(</span><span 
class=s>&#34;timer&#34;</span><span class=o>)</span> <span 
class=kd>public</span> <span class=kt>void</span> <span 
class=nf>onTimer</span><span class=o>()</span> <span class=o>{</span>
       <span class=c1>//Process timer.
 </span><span class=c1></span>   <span class=o>}</span>
-<span class=o>}));</span></code></pre></div></div><div 
class=language-python><div class=highlight><pre class=chroma><code 
class=language-python data-lang=python><span class=k>class</span> <span 
class=nc>ProcessingTimerDoFn</span><span class=p>(</span><span 
class=n>DoFn</span><span class=p>):</span>
+<span class=o>}));</span></code></pre></div></div><div class=language-py><div 
class=highlight><pre class=chroma><code class=language-py data-lang=py><span 
class=k>class</span> <span class=nc>ProcessingTimerDoFn</span><span 
class=p>(</span><span class=n>DoFn</span><span class=p>):</span>
   <span class=n>ALL_ELEMENTS</span> <span class=o>=</span> <span 
class=n>BagStateSpec</span><span class=p>(</span><span 
class=s1>&#39;buffer&#39;</span><span class=p>,</span> <span 
class=n>coders</span><span class=o>.</span><span 
class=n>VarIntCoder</span><span class=p>())</span>
   <span class=n>TIMER</span> <span class=o>=</span> <span 
class=n>TimerSpec</span><span class=p>(</span><span 
class=s1>&#39;timer&#39;</span><span class=p>,</span> <span 
class=n>TimeDomain</span><span class=o>.</span><span 
class=n>REAL_TIME</span><span class=p>)</span>
 
@@ -2456,11 +2465,12 @@ to the current time. In Java, the 
<code>Timer.offset</code> and <code>Timer.setR
     <span class=n>state</span><span class=o>.</span><span 
class=n>clear</span><span class=p>()</span>
 
 <span class=n>_</span> <span class=o>=</span> <span class=p>(</span><span 
class=n>p</span> <span class=o>|</span> <span class=s1>&#39;Read per 
user&#39;</span> <span class=o>&gt;&gt;</span> <span 
class=n>ReadPerUser</span><span class=p>()</span>
-       <span class=o>|</span> <span class=s1>&#39;ProcessingTime timer 
pardo&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>beam</span><span 
class=o>.</span><span class=n>ParDo</span><span class=p>(</span><span 
class=n>ProcessingTimerDoFn</span><span 
class=p>()))</span></code></pre></div></div><h4 id=dynamic-timer-tags>11.3.3. 
Dynamic timer tags</h4><p>Beam also supports dynamically setting a timer tag 
using <code>TimerMap</code>. This allows for setting multiple different timers
+       <span class=o>|</span> <span class=s1>&#39;ProcessingTime timer 
pardo&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>beam</span><span 
class=o>.</span><span class=n>ParDo</span><span class=p>(</span><span 
class=n>ProcessingTimerDoFn</span><span 
class=p>()))</span></code></pre></div></div><h4 id=dynamic-timer-tags>11.3.3. 
Dynamic timer tags</h4><p>Beam also supports dynamically setting a timer tag 
using <code>TimerMap</code> in the Java SDK. This allows for setting multiple 
[...]
 in a <code>DoFn</code> and allowing for the timer tags to be dynamically 
chosen - e.g. based on data in the input elements. A
 timer with a specific tag can only be set to a single timestamp, so setting 
the timer again has the effect of
 overwriting the previous expiration time for the timer with that tag. Each 
<code>TimerMap</code> is identified with a timer family
-id, and timers in different timer families are independent.</p><div 
class=language-java><div class=highlight><pre class=chroma><code 
class=language-java data-lang=java><span class=n>PCollection</span><span 
class=o>&lt;</span><span class=n>KV</span><span class=o>&lt;</span><span 
class=n>String</span><span class=o>,</span> <span class=n>ValueT</span><span 
class=o>&gt;&gt;</span> <span class=n>perUser</span> <span class=o>=</span> 
<span class=n>readPerUser</span><span class=o>();</span>
+id, and timers in different timer families are independent.</p><p>In the 
Python SDK, a dynamic timer tag can be specified while calling 
<code>set()</code> or <code>clear()</code>. By default, the timer
+tag is an empty string if not specified.</p><div class=language-java><div 
class=highlight><pre class=chroma><code class=language-java 
data-lang=java><span class=n>PCollection</span><span class=o>&lt;</span><span 
class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span 
class=o>,</span> <span class=n>ValueT</span><span class=o>&gt;&gt;</span> <span 
class=n>perUser</span> <span class=o>=</span> <span 
class=n>readPerUser</span><span class=o>();</span>
 <span class=n>perUser</span><span class=o>.</span><span 
class=na>apply</span><span class=o>(</span><span class=n>ParDo</span><span 
class=o>.</span><span class=na>of</span><span class=o>(</span><span 
class=k>new</span> <span class=n>DoFn</span><span class=o>&lt;</span><span 
class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span 
class=o>,</span> <span class=n>ValueT</span><span class=o>&gt;,</span> <span 
class=n>OutputT</span><span class=o>&gt;()</span> <span class=o>{</span>
   <span class=nd>@TimerFamily</span><span class=o>(</span><span 
class=s>&#34;actionTimers&#34;</span><span class=o>)</span> <span 
class=kd>private</span> <span class=kd>final</span> <span 
class=n>TimerSpec</span> <span class=n>timer</span> <span class=o>=</span>
     <span class=n>TimerSpecs</span><span class=o>.</span><span 
class=na>timerMap</span><span class=o>(</span><span 
class=n>TimeDomain</span><span class=o>.</span><span 
class=na>EVENT_TIME</span><span class=o>);</span>
@@ -2475,7 +2485,30 @@ id, and timers in different timer families are 
independent.</p><div class=langua
    <span class=nd>@OnTimerFamily</span><span class=o>(</span><span 
class=s>&#34;actionTimers&#34;</span><span class=o>)</span> <span 
class=kd>public</span> <span class=kt>void</span> <span 
class=nf>onTimer</span><span class=o>(</span><span class=nd>@TimerId</span> 
<span class=n>String</span> <span class=n>timerId</span><span class=o>)</span> 
<span class=o>{</span>
      <span class=n>LOG</span><span class=o>.</span><span 
class=na>info</span><span class=o>(</span><span class=s>&#34;Timer fired with 
id &#34;</span> <span class=o>+</span> <span class=n>timerId</span><span 
class=o>);</span>
    <span class=o>}</span>
-<span class=o>}));</span></code></pre></div></div><div 
class=language-python><div class=highlight><pre class=chroma><code 
class=language-python data-lang=python><span class=n>To</span> <span 
class=n>be</span> <span class=n>supported</span><span class=p>,</span> <span 
class=n>See</span> <span class=n>BEAM</span><span class=o>-</span><span 
class=mi>9602</span></code></pre></div></div><h4 
id=timer-output-timestamps>11.3.4. Timer output timestamps</h4><p>By default, 
event-time timers will ho [...]
+<span class=o>}));</span></code></pre></div></div><div class=language-py><div 
class=highlight><pre class=chroma><code class=language-py data-lang=py><span 
class=k>class</span> <span class=nc>TimerDoFn</span><span class=p>(</span><span 
class=n>DoFn</span><span class=p>):</span>
+  <span class=n>ALL_ELEMENTS</span> <span class=o>=</span> <span 
class=n>BagStateSpec</span><span class=p>(</span><span 
class=s1>&#39;buffer&#39;</span><span class=p>,</span> <span 
class=n>coders</span><span class=o>.</span><span 
class=n>VarIntCoder</span><span class=p>())</span>
+  <span class=n>TIMER</span> <span class=o>=</span> <span 
class=n>TimerSpec</span><span class=p>(</span><span 
class=s1>&#39;timer&#39;</span><span class=p>,</span> <span 
class=n>TimeDomain</span><span class=o>.</span><span 
class=n>REAL_TIME</span><span class=p>)</span>
+
+  <span class=k>def</span> <span class=nf>process</span><span 
class=p>(</span><span class=bp>self</span><span class=p>,</span>
+              <span class=n>element_pair</span><span class=p>,</span>
+              <span class=nb>buffer</span> <span class=o>=</span> <span 
class=n>DoFn</span><span class=o>.</span><span class=n>StateParam</span><span 
class=p>(</span><span class=n>ALL_ELEMENTS</span><span class=p>),</span>
+              <span class=n>timer</span> <span class=o>=</span> <span 
class=n>DoFn</span><span class=o>.</span><span class=n>TimerParam</span><span 
class=p>(</span><span class=n>TIMER</span><span class=p>)):</span>
+    <span class=nb>buffer</span><span class=o>.</span><span 
class=n>add</span><span class=p>(</span><span class=n>element_pair</span><span 
class=p>[</span><span class=mi>1</span><span class=p>])</span>
+    <span class=c1># Set a timer to go off 30 seconds in the future with 
dynamic timer tag &#39;first_timer&#39;.</span>
+    <span class=c1># And set a timer to go off 60 seconds in the future with 
dynamic timer tag &#39;second_timer&#39;.</span>
+    <span class=n>timer</span><span class=o>.</span><span 
class=n>set</span><span class=p>(</span><span class=n>Timestamp</span><span 
class=o>.</span><span class=n>now</span><span class=p>()</span> <span 
class=o>+</span> <span class=n>Duration</span><span class=p>(</span><span 
class=n>seconds</span><span class=o>=</span><span class=mi>30</span><span 
class=p>),</span> <span class=n>dynamic_timer_tag</span><span 
class=o>=</span><span class=s1>&#39;first_timer&#39;</span><span 
class=p>)</span>
+    <span class=n>timer</span><span class=o>.</span><span 
class=n>set</span><span class=p>(</span><span class=n>Timestamp</span><span 
class=o>.</span><span class=n>now</span><span class=p>()</span> <span 
class=o>+</span> <span class=n>Duration</span><span class=p>(</span><span 
class=n>seconds</span><span class=o>=</span><span class=mi>60</span><span 
class=p>),</span> <span class=n>dynamic_timer_tag</span><span 
class=o>=</span><span class=s1>&#39;second_timer&#39;</span><span 
class=p>)</span>
+    <span class=c1># Note that a timer can also be explicitly cleared if 
previously set with a dynamic timer tag:</span>
+    <span class=c1># timer.clear(dynamic_timer_tag=...)</span>
+
+  <span class=nd>@on_timer</span><span class=p>(</span><span 
class=n>TIMER</span><span class=p>)</span>
+  <span class=k>def</span> <span class=nf>expiry_callback</span><span 
class=p>(</span><span class=bp>self</span><span class=p>,</span> <span 
class=nb>buffer</span> <span class=o>=</span> <span class=n>DoFn</span><span 
class=o>.</span><span class=n>StateParam</span><span class=p>(</span><span 
class=n>ALL_ELEMENTS</span><span class=p>),</span> <span 
class=n>timer_tag</span><span class=o>=</span><span class=n>DoFn</span><span 
class=o>.</span><span class=n>DynamicTimerTagParam</span><span cl [...]
+    <span class=c1># Process timer, the dynamic timer tag associated with 
expiring timer can be read back with DoFn.DynamicTimerTagParam.</span>
+    <span class=nb>buffer</span><span class=o>.</span><span 
class=n>clear</span><span class=p>()</span>
+    <span class=k>yield</span> <span class=p>(</span><span 
class=n>timer_tag</span><span class=p>,</span> <span 
class=s1>&#39;fired&#39;</span><span class=p>)</span>
+
+<span class=n>_</span> <span class=o>=</span> <span class=p>(</span><span 
class=n>p</span> <span class=o>|</span> <span class=s1>&#39;Read per 
user&#39;</span> <span class=o>&gt;&gt;</span> <span 
class=n>ReadPerUser</span><span class=p>()</span>
+       <span class=o>|</span> <span class=s1>&#39;ProcessingTime timer 
pardo&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>beam</span><span 
class=o>.</span><span class=n>ParDo</span><span class=p>(</span><span 
class=n>TimerDoFn</span><span class=p>()))</span></code></pre></div></div><h4 
id=timer-output-timestamps>11.3.4. Timer output timestamps</h4><p>By default, 
event-time timers will hold the output watermark of the <code>ParDo</code> to 
the timestamp of the timer. This means
 that if a timer is set to 12pm, any windowed aggregations or event-time timers 
later in the pipeline graph that finish
 after 12pm will not expire. The timestamp of the timer is also the default 
output timestamp for the timer callback. This
 means that any elements output from the onTimer method will have a timestamp 
equal to the timestamp of the timer firing.
@@ -2562,7 +2595,7 @@ past the timestamp of the minimum element. The following 
code demonstrates this.
     <span class=c1>// Note that the timer has now fired.
 </span><span class=c1></span>    <span class=n>timerTimestamp</span><span 
class=o>.</span><span class=na>clear</span><span class=o>();</span>
   <span class=o>}</span>
-<span class=o>}));</span></code></pre></div></div><h3 
id=garbage-collecting-state>11.4. Garbage collecting state</h3><p>Per-key state 
needs to be garbage collected, or eventually the increasing size of state may 
negatively impact
+<span class=o>}));</span></code></pre></div></div><div class=language-py><div 
class=highlight><pre class=chroma><code class=language-py data-lang=py><span 
class=n>Timer</span> <span class=n>output</span> <span 
class=n>timestamps</span> <span class=ow>is</span> <span class=ow>not</span> 
<span class=n>yet</span> <span class=n>supported</span> <span 
class=ow>in</span> <span class=n>Python</span> <span class=n>SDK</span><span 
class=o>.</span> <span class=n>See</span> <span class=n>BEAM</span [...]
 performance. There are two common strategies for garbage collecting 
state.</p><h5 id=using-windows-for-garbage-collection>11.4.1. <strong>Using 
windows for garbage collection</strong></h5><p>All state and timers for a key 
is scoped to the window it is in. This means that depending on the timestamp of 
the
 input element the ParDo will see different values for the state depending on 
the window that element falls into. In
 addition, once the input watermark passes the end of the window, the runner 
should garbage collect all state for that
@@ -2578,7 +2611,7 @@ garbage-collection strategy.</p><p>For example, given the 
following:</p><div cla
               <span class=c1>// The state is scoped to a calendar day window. 
That means that if the input timestamp ts is after
 </span><span class=c1></span>              <span class=c1>// midnight PST, 
then a new copy of the state will be seen for the next day.
 </span><span class=c1></span>           <span class=o>}</span>
-         <span class=o>}));</span></code></pre></div></div><div 
class=language-python><div class=highlight><pre class=chroma><code 
class=language-python data-lang=python><span class=k>class</span> <span 
class=nc>StateDoFn</span><span class=p>(</span><span class=n>DoFn</span><span 
class=p>):</span>
+         <span class=o>}));</span></code></pre></div></div><div 
class=language-py><div class=highlight><pre class=chroma><code 
class=language-py data-lang=py><span class=k>class</span> <span 
class=nc>StateDoFn</span><span class=p>(</span><span class=n>DoFn</span><span 
class=p>):</span>
   <span class=n>ALL_ELEMENTS</span> <span class=o>=</span> <span 
class=n>BagStateSpec</span><span class=p>(</span><span 
class=s1>&#39;buffer&#39;</span><span class=p>,</span> <span 
class=n>coders</span><span class=o>.</span><span 
class=n>VarIntCoder</span><span class=p>())</span>
 
   <span class=k>def</span> <span class=nf>process</span><span 
class=p>(</span><span class=bp>self</span><span class=p>,</span>
@@ -2625,7 +2658,7 @@ This can be done by updating a timer that garbage 
collects state. For example</p
 </span><span class=c1></span>       <span class=n>state</span><span 
class=o>.</span><span class=na>clear</span><span class=o>();</span>
        <span class=n>maxTimestamp</span><span class=o>.</span><span 
class=na>clear</span><span class=o>();</span>
     <span class=o>}</span>
- <span class=o>}</span></code></pre></div></div><div 
class=language-python><div class=highlight><pre class=chroma><code 
class=language-python data-lang=python><span class=k>class</span> <span 
class=nc>UserDoFn</span><span class=p>(</span><span class=n>DoFn</span><span 
class=p>):</span>
+ <span class=o>}</span></code></pre></div></div><div class=language-py><div 
class=highlight><pre class=chroma><code class=language-py data-lang=py><span 
class=k>class</span> <span class=nc>UserDoFn</span><span class=p>(</span><span 
class=n>DoFn</span><span class=p>):</span>
   <span class=n>ALL_ELEMENTS</span> <span class=o>=</span> <span 
class=n>BagStateSpec</span><span class=p>(</span><span 
class=s1>&#39;state&#39;</span><span class=p>,</span> <span 
class=n>coders</span><span class=o>.</span><span 
class=n>VarIntCoder</span><span class=p>())</span>
   <span class=n>MAX_TIMESTAMP</span> <span class=o>=</span> <span 
class=n>CombiningValueStateSpec</span><span class=p>(</span><span 
class=s1>&#39;max_timestamp_seen&#39;</span><span class=p>,</span> <span 
class=nb>max</span><span class=p>)</span>
   <span class=n>TIMER</span> <span class=o>=</span> <span 
class=n>TimerSpec</span><span class=p>(</span><span 
class=s1>&#39;gc-timer&#39;</span><span class=p>,</span> <span 
class=n>TimeDomain</span><span class=o>.</span><span 
class=n>WATERMARK</span><span class=p>)</span>
@@ -2667,7 +2700,7 @@ event before the view event. The one hour join timeout 
should be based on event
     <span class=n>readEvents</span><span class=o>()</span>
     <span class=o>.</span><span class=na>apply</span><span 
class=o>(</span><span class=n>WithKeys</span><span class=o>.</span><span 
class=na>of</span><span class=o>(</span><span class=n>Event</span><span 
class=o>::</span><span class=n>getLinkId</span><span class=o>).</span><span 
class=na>withKeyType</span><span class=o>(</span><span 
class=n>TypeDescriptors</span><span class=o>.</span><span 
class=na>strings</span><span class=o>()));</span>
 
-<span class=n>perUser</span><span class=o>.</span><span 
class=na>apply</span><span class=o>(</span><span class=n>ParDo</span><span 
class=o>.</span><span class=na>of</span><span class=o>(</span><span 
class=k>new</span> <span class=n>DoFn</span><span class=o>&lt;</span><span 
class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span 
class=o>,</span> <span class=n>Event</span><span class=o>&gt;,</span> <span 
class=n>JoinedEvent</span><span class=o>&gt;()</span> <span class=o [...]
+<span class=n>eventsPerLinkId</span><span class=o>.</span><span 
class=na>apply</span><span class=o>(</span><span class=n>ParDo</span><span 
class=o>.</span><span class=na>of</span><span class=o>(</span><span 
class=k>new</span> <span class=n>DoFn</span><span class=o>&lt;</span><span 
class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span 
class=o>,</span> <span class=n>Event</span><span class=o>&gt;,</span> <span 
class=n>JoinedEvent</span><span class=o>&gt;()</span> <span [...]
   <span class=c1>// Store the view event.
 </span><span class=c1></span>  <span class=nd>@StateId</span><span 
class=o>(</span><span class=s>&#34;view&#34;</span><span class=o>)</span> <span 
class=kd>private</span> <span class=kd>final</span> <span 
class=n>StateSpec</span><span class=o>&lt;</span><span 
class=n>ValueState</span><span class=o>&lt;</span><span 
class=n>Event</span><span class=o>&gt;&gt;</span> <span 
class=n>viewState</span> <span class=o>=</span> <span 
class=n>StateSpecs</span><span class=o>.</span><span class=na>valu [...]
   <span class=c1>// Store the click event.
@@ -2726,7 +2759,54 @@ event before the view event. The one hour join timeout 
should be based on event
       <span class=n>clickState</span><span class=o>.</span><span 
class=na>clear</span><span class=o>();</span>
       <span class=n>maxTimestampState</span><span class=o>.</span><span 
class=na>clear</span><span class=o>();</span>
     <span class=o>}</span>
- <span class=o>}));</span></code></pre></div></div><h4 
id=batching-rpcs>11.5.2. Batching RPCs</h4><p>In this example, input elements 
are being forwarded to an external RPC service. The RPC accepts batch requests -
+ <span class=o>}));</span></code></pre></div></div><div class=language-py><div 
class=highlight><pre class=chroma><code class=language-py data-lang=py><span 
class=k>class</span> <span class=nc>JoinDoFn</span><span class=p>(</span><span 
class=n>DoFn</span><span class=p>):</span>
+  <span class=c1># stores the view event.</span>
+  <span class=n>VIEW_STATE_SPEC</span> <span class=o>=</span> <span 
class=n>ReadModifyWriteStateSpec</span><span class=p>(</span><span 
class=s1>&#39;view&#39;</span><span class=p>,</span> <span 
class=n>EventCoder</span><span class=p>())</span>
+  <span class=c1># stores the click event.</span>
+  <span class=n>CLICK_STATE_SPEC</span> <span class=o>=</span> <span 
class=n>ReadModifyWriteStateSpec</span><span class=p>(</span><span 
class=s1>&#39;click&#39;</span><span class=p>,</span> <span 
class=n>EventCoder</span><span class=p>())</span>
+  <span class=c1># The maximum element timestamp value seen so far.</span>
+  <span class=n>MAX_TIMESTAMP</span> <span class=o>=</span> <span 
class=n>CombiningValueStateSpec</span><span class=p>(</span><span 
class=s1>&#39;max_timestamp_seen&#39;</span><span class=p>,</span> <span 
class=nb>max</span><span class=p>)</span>
+  <span class=c1># Timer that fires when an hour goes by with an incomplete 
join.</span>
+  <span class=n>GC_TIMER</span> <span class=o>=</span> <span 
class=n>TimerSpec</span><span class=p>(</span><span 
class=s1>&#39;gc&#39;</span><span class=p>,</span> <span 
class=n>TimeDomain</span><span class=o>.</span><span 
class=n>WATERMARK</span><span class=p>)</span>
+
+  <span class=k>def</span> <span class=nf>process</span><span 
class=p>(</span><span class=bp>self</span><span class=p>,</span>
+              <span class=n>element</span><span class=p>,</span>
+              <span class=n>view</span><span class=o>=</span><span 
class=n>DoFn</span><span class=o>.</span><span class=n>StateParam</span><span 
class=p>(</span><span class=n>VIEW_STATE_SPEC</span><span class=p>),</span>
+              <span class=n>click</span><span class=o>=</span><span 
class=n>DoFn</span><span class=o>.</span><span class=n>StateParam</span><span 
class=p>(</span><span class=n>CLICK_STATE_SPEC</span><span class=p>),</span>
+              <span class=n>max_timestamp_seen</span><span 
class=o>=</span><span class=n>DoFn</span><span class=o>.</span><span 
class=n>StateParam</span><span class=p>(</span><span 
class=n>MAX_TIMESTAMP</span><span class=p>),</span>
+              <span class=n>ts</span><span class=o>=</span><span 
class=n>DoFn</span><span class=o>.</span><span 
class=n>TimestampParam</span><span class=p>,</span>
+              <span class=n>gc</span><span class=o>=</span><span 
class=n>DoFn</span><span class=o>.</span><span class=n>TimerParam</span><span 
class=p>(</span><span class=n>GC_TIMER</span><span class=p>)):</span>
+    <span class=n>event</span> <span class=o>=</span> <span 
class=n>element</span>
+    <span class=k>if</span> <span class=n>event</span><span 
class=o>.</span><span class=n>type</span> <span class=o>==</span> <span 
class=s1>&#39;view&#39;</span><span class=p>:</span>
+      <span class=n>view</span><span class=o>.</span><span 
class=n>write</span><span class=p>(</span><span class=n>event</span><span 
class=p>)</span>
+    <span class=k>else</span><span class=p>:</span>
+      <span class=n>click</span><span class=o>.</span><span 
class=n>write</span><span class=p>(</span><span class=n>event</span><span 
class=p>)</span>
+
+    <span class=n>previous_view</span> <span class=o>=</span> <span 
class=n>view</span><span class=o>.</span><span class=n>read</span><span 
class=p>()</span>
+    <span class=n>previous_click</span> <span class=o>=</span> <span 
class=n>click</span><span class=o>.</span><span class=n>read</span><span 
class=p>()</span>
+
+    <span class=c1># We&#39;ve seen both a view and a click. Output a joined 
event and clear state.</span>
+    <span class=k>if</span> <span class=n>previous_view</span> <span 
class=ow>and</span> <span class=n>previous_click</span><span class=p>:</span>
+      <span class=k>yield</span> <span class=p>(</span><span 
class=n>previous_view</span><span class=p>,</span> <span 
class=n>previous_click</span><span class=p>)</span>
+      <span class=n>view</span><span class=o>.</span><span 
class=n>clear</span><span class=p>()</span>
+      <span class=n>click</span><span class=o>.</span><span 
class=n>clear</span><span class=p>()</span>
+      <span class=n>max_timestamp_seen</span><span class=o>.</span><span 
class=n>clear</span><span class=p>()</span>
+    <span class=k>else</span><span class=p>:</span>
+      <span class=n>max_timestamp_seen</span><span class=o>.</span><span 
class=n>add</span><span class=p>(</span><span class=n>ts</span><span 
class=p>)</span>
+      <span class=n>gc</span><span class=o>.</span><span 
class=n>set</span><span class=p>(</span><span 
class=n>max_timestamp_seen</span><span class=o>.</span><span 
class=n>read</span><span class=p>()</span> <span class=o>+</span> <span 
class=n>Duration</span><span class=p>(</span><span class=n>seconds</span><span 
class=o>=</span><span class=mi>3600</span><span class=p>))</span>
+
+  <span class=nd>@on_timer</span><span class=p>(</span><span 
class=n>GC_TIMER</span><span class=p>)</span>
+  <span class=k>def</span> <span class=nf>gc_callback</span><span 
class=p>(</span><span class=bp>self</span><span class=p>,</span>
+                  <span class=n>view</span><span class=o>=</span><span 
class=n>DoFn</span><span class=o>.</span><span class=n>StateParam</span><span 
class=p>(</span><span class=n>VIEW_STATE_SPEC</span><span class=p>),</span>
+                  <span class=n>click</span><span class=o>=</span><span 
class=n>DoFn</span><span class=o>.</span><span class=n>StateParam</span><span 
class=p>(</span><span class=n>CLICK_STATE_SPEC</span><span class=p>),</span>
+                  <span class=n>max_timestamp_seen</span><span 
class=o>=</span><span class=n>DoFn</span><span class=o>.</span><span 
class=n>StateParam</span><span class=p>(</span><span 
class=n>MAX_TIMESTAMP</span><span class=p>)):</span>
+    <span class=n>view</span><span class=o>.</span><span 
class=n>clear</span><span class=p>()</span>
+    <span class=n>click</span><span class=o>.</span><span 
class=n>clear</span><span class=p>()</span>
+    <span class=n>max_timestamp_seen</span><span class=o>.</span><span 
class=n>clear</span><span class=p>()</span>
+
+
+<span class=n>_</span> <span class=o>=</span> <span class=p>(</span><span 
class=n>p</span> <span class=o>|</span> <span 
class=s1>&#39;EventsPerLinkId&#39;</span> <span class=o>&gt;&gt;</span> <span 
class=n>ReadPerLinkEvents</span><span class=p>()</span>
+       <span class=o>|</span> <span class=s1>&#39;Join DoFn&#39;</span> <span 
class=o>&gt;&gt;</span> <span class=n>beam</span><span class=o>.</span><span 
class=n>ParDo</span><span class=p>(</span><span class=n>JoinDoFn</span><span 
class=p>()))</span></code></pre></div></div><h4 id=batching-rpcs>11.5.2. 
Batching RPCs</h4><p>In this example, input elements are being forwarded to an 
external RPC service. The RPC accepts batch requests -
 multiple events for the same user can be batched in a single RPC call. Since 
this RPC service also imposes rate limits,
 we want to batch ten seconds worth of events together in order to reduce the 
number of calls.</p><div class=language-java><div class=highlight><pre 
class=chroma><code class=language-java data-lang=java><span 
class=n>PCollection</span><span class=o>&lt;</span><span class=n>KV</span><span 
class=o>&lt;</span><span class=n>String</span><span class=o>,</span> <span 
class=n>ValueT</span><span class=o>&gt;&gt;</span> <span class=n>perUser</span> 
<span class=o>=</span> <span class=n>readPerUser< [...]
 <span class=n>perUser</span><span class=o>.</span><span 
class=na>apply</span><span class=o>(</span><span class=n>ParDo</span><span 
class=o>.</span><span class=na>of</span><span class=o>(</span><span 
class=k>new</span> <span class=n>DoFn</span><span class=o>&lt;</span><span 
class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span 
class=o>,</span> <span class=n>ValueT</span><span class=o>&gt;,</span> <span 
class=n>OutputT</span><span class=o>&gt;()</span> <span class=o>{</span>
@@ -2759,7 +2839,27 @@ we want to batch ten seconds worth of events together in 
order to reduce the num
     <span class=n>elementsState</span><span class=o>.</span><span 
class=na>clear</span><span class=o>();</span>
     <span class=n>isTimerSetState</span><span class=o>.</span><span 
class=na>clear</span><span class=o>();</span>
   <span class=o>}</span>
-<span class=o>}));</span></code></pre></div></div><h2 id=splittable-dofns>12. 
Splittable <code>DoFns</code></h2><p>A Splittable <code>DoFn</code> (SDF) 
enables users to create modular components containing I/Os (and some advanced
+<span class=o>}));</span></code></pre></div></div><div class=language-py><div 
class=highlight><pre class=chroma><code class=language-py data-lang=py><span 
class=k>class</span> <span class=nc>BufferDoFn</span><span 
class=p>(</span><span class=n>DoFn</span><span class=p>):</span>
+  <span class=n>BUFFER</span> <span class=o>=</span> <span 
class=n>BagStateSpec</span><span class=p>(</span><span 
class=s1>&#39;buffer&#39;</span><span class=p>,</span> <span 
class=n>EventCoder</span><span class=p>())</span>
+  <span class=n>IS_TIMER_SET</span> <span class=o>=</span> <span 
class=n>ReadModifyWriteStateSpec</span><span class=p>(</span><span 
class=s1>&#39;is_timer_set&#39;</span><span class=p>,</span> <span 
class=n>BooleanCoder</span><span class=p>())</span>
+  <span class=n>OUTPUT</span> <span class=o>=</span> <span 
class=n>TimerSpec</span><span class=p>(</span><span 
class=s1>&#39;output&#39;</span><span class=p>,</span> <span 
class=n>TimeDomain</span><span class=o>.</span><span 
class=n>REAL_TIME</span><span class=p>)</span>
+
+  <span class=k>def</span> <span class=nf>process</span><span 
class=p>(</span><span class=bp>self</span><span class=p>,</span>
+              <span class=nb>buffer</span><span class=o>=</span><span 
class=n>DoFn</span><span class=o>.</span><span class=n>StateParam</span><span 
class=p>(</span><span class=n>BUFFER</span><span class=p>),</span>
+              <span class=n>is_timer_set</span><span class=o>=</span><span 
class=n>DoFn</span><span class=o>.</span><span class=n>StateParam</span><span 
class=p>(</span><span class=n>IS_TIMER_SET</span><span class=p>),</span>
+              <span class=n>timer</span><span class=o>=</span><span 
class=n>DoFn</span><span class=o>.</span><span class=n>TimerParam</span><span 
class=p>(</span><span class=n>OUTPUT</span><span class=p>)):</span>
+    <span class=nb>buffer</span><span class=o>.</span><span 
class=n>add</span><span class=p>(</span><span class=n>element</span><span 
class=p>)</span>
+    <span class=k>if</span> <span class=ow>not</span> <span 
class=n>is_timer_set</span><span class=o>.</span><span class=n>read</span><span 
class=p>():</span>
+      <span class=n>timer</span><span class=o>.</span><span 
class=n>set</span><span class=p>(</span><span class=n>Timestamp</span><span 
class=o>.</span><span class=n>now</span><span class=p>()</span> <span 
class=o>+</span> <span class=n>Duration</span><span class=p>(</span><span 
class=n>seconds</span><span class=o>=</span><span class=mi>10</span><span 
class=p>))</span>
+      <span class=n>is_timer_set</span><span class=o>.</span><span 
class=n>write</span><span class=p>(</span><span class=bp>True</span><span 
class=p>)</span>
+
+  <span class=nd>@on_timer</span><span class=p>(</span><span 
class=n>OUTPUT</span><span class=p>)</span>
+  <span class=k>def</span> <span class=nf>output_callback</span><span 
class=p>(</span><span class=bp>self</span><span class=p>,</span>
+                      <span class=nb>buffer</span><span class=o>=</span><span 
class=n>DoFn</span><span class=o>.</span><span class=n>StateParam</span><span 
class=p>(</span><span class=n>BUFFER</span><span class=p>),</span>
+                      <span class=n>is_timer_set</span><span 
class=o>=</span><span class=n>DoFn</span><span class=o>.</span><span 
class=n>StateParam</span><span class=p>(</span><span 
class=n>IS_TIMER_SET</span><span class=p>)):</span>
+    <span class=n>send_rpc</span><span class=p>(</span><span 
class=nb>list</span><span class=p>(</span><span class=nb>buffer</span><span 
class=o>.</span><span class=n>read</span><span class=p>()))</span>
+    <span class=nb>buffer</span><span class=o>.</span><span 
class=n>clear</span><span class=p>()</span>
+    <span class=n>is_timer_set</span><span class=o>.</span><span 
class=n>clear</span><span class=p>()</span></code></pre></div></div><h2 
id=splittable-dofns>12. Splittable <code>DoFns</code></h2><p>A Splittable 
<code>DoFn</code> (SDF) enables users to create modular components containing 
I/Os (and some advanced
 <a href="https://s.apache.org/splittable-do-fn#heading=h.5cep9s8k4fxv";>non I/O 
use cases</a>). Having modular
 I/O components that can be connected to each other simplify typical patterns 
that users want.
 For example, a popular use case is to read filenames from a message queue 
followed by parsing those
diff --git a/website/generated-content/sitemap.xml 
b/website/generated-content/sitemap.xml
index cc6607f..4bbf3a9 100644
--- a/website/generated-content/sitemap.xml
+++ b/website/generated-content/sitemap.xml
@@ -1 +1 @@
-<?xml version="1.0" encoding="utf-8" standalone="yes"?><urlset 
xmlns="http://www.sitemaps.org/schemas/sitemap/0.9"; 
xmlns:xhtml="http://www.w3.org/1999/xhtml";><url><loc>/categories/blog/</loc><lastmod>2021-01-20T19:53:05+03:00</lastmod></url><url><loc>/blog/</loc><lastmod>2021-01-20T19:53:05+03:00</lastmod></url><url><loc>/categories/</loc><lastmod>2021-01-20T19:53:05+03:00</lastmod></url><url><loc>/blog/kafka-to-pubsub-example/</loc><lastmod>2021-01-20T19:53:05+03:00</lastmod></url><url>
 [...]
\ No newline at end of file
+<?xml version="1.0" encoding="utf-8" standalone="yes"?><urlset 
xmlns="http://www.sitemaps.org/schemas/sitemap/0.9"; 
xmlns:xhtml="http://www.w3.org/1999/xhtml";><url><loc>/categories/blog/</loc><lastmod>2021-01-20T19:53:05+03:00</lastmod></url><url><loc>/blog/</loc><lastmod>2021-01-20T19:53:05+03:00</lastmod></url><url><loc>/categories/</loc><lastmod>2021-01-20T19:53:05+03:00</lastmod></url><url><loc>/blog/kafka-to-pubsub-example/</loc><lastmod>2021-01-20T19:53:05+03:00</lastmod></url><url>
 [...]
\ No newline at end of file

Reply via email to