This is an automated email from the ASF dual-hosted git repository.

git-site-role pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/asf-site by this push:
     new 2714963  Publishing website 2020/06/06 00:06:29 at commit 42faa44
2714963 is described below

commit 2714963849242ea3871be308d119b8a599405c58
Author: jenkins <[email protected]>
AuthorDate: Sat Jun 6 00:06:30 2020 +0000

    Publishing website 2020/06/06 00:06:29 at commit 42faa44
---
 website/generated-content/documentation/index.xml  | 108 ++++++++++++++++++-
 .../documentation/programming-guide/index.html     | 119 ++++++++++++++++++---
 website/generated-content/get-started/index.xml    |   8 +-
 .../get-started/quickstart-py/index.html           |   2 +-
 website/generated-content/sitemap.xml              |   2 +-
 5 files changed, 220 insertions(+), 19 deletions(-)

diff --git a/website/generated-content/documentation/index.xml 
b/website/generated-content/documentation/index.xml
index 873fd6b..02ed7a7 100644
--- a/website/generated-content/documentation/index.xml
+++ b/website/generated-content/documentation/index.xml
@@ -5271,16 +5271,19 @@ the &lt;code>ParDo&lt;/code> these state variables can 
be used to write or updat
 written for that key. State is always fully scoped only to the current 
processing key.&lt;/p>
 &lt;p>Windowing can still be used together with stateful processing. All state 
for a key is scoped to the current window. This
 means that the first time a key is seen for a given window any state reads 
will return empty, and that a runner can
-garbage collect state when a window is completed. It&amp;rsquo;s also often 
useful to use Beam&amp;rsquo;s windowed aggegations prior to
+garbage collect state when a window is completed. It&amp;rsquo;s also often 
useful to use Beam&amp;rsquo;s windowed aggregations prior to
 the stateful operator. For example, using a combiner to preaggregate data, and 
then storing aggregated data inside of
 state. Merging windows are not currently supported when using state and 
timers.&lt;/p>
 &lt;p>Sometimes stateful processing is used to implement state-machine style 
processing inside a &lt;code>DoFn&lt;/code>. When doing this,
 care must be taken to remember that the elements in input PCollection have no 
guaranteed order and to ensure that the
 program logic is resilient to this. Unit tests written using the DirectRunner 
will shuffle the order of element
 processing, and are recommended to test for correctness.&lt;/p>
-&lt;p>In Java DoFn declares states to be accessed by creating final 
&lt;code>StateSpec&lt;/code> member variables representing each state. Each
+&lt;p class="language-java">In Java DoFn declares states to be accessed by 
creating final &lt;code>StateSpec&lt;/code> member variables representing each 
state. Each
 state must be named using the &lt;code>StateId&lt;/code> annotation; this name 
is unique to a ParDo in the graph and has no relation
 to other nodes in the graph. A &lt;code>DoFn&lt;/code> can declare multiple 
state variables.&lt;/p>
+&lt;p class="language-py">In Python DoFn declares states to be accessed by 
creating &lt;code>StateSpec&lt;/code> class member variables representing each 
state. Each
+&lt;code>StateSpec&lt;/code> is initialized with a name, this name is unique 
to a ParDo in the graph and has no relation
+to other nodes in the graph. A &lt;code>DoFn&lt;/code> can declare multiple 
state variables.&lt;/p>
 &lt;h3 id="types-of-state">11.1 Types of state&lt;/h3>
 &lt;p>Beam provides several types of state:&lt;/p>
 &lt;h4 id="valuestate">ValueState&lt;/h4>
@@ -5323,6 +5326,14 @@ accumulates the number of elements seen.&lt;/p>
 &lt;span class="o">}&lt;/span>
 &lt;span class="o">}));&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
 &lt;/div>
+&lt;div class=language-python>
+&lt;div class="highlight">&lt;pre class="chroma">&lt;code 
class="language-python" data-lang="python">&lt;span class="k">class&lt;/span> 
&lt;span class="nc">CombiningStateDoFn&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span 
class="p">):&lt;/span>
+&lt;span class="n">SUM_TOTAL&lt;/span> &lt;span class="o">=&lt;/span> &lt;span 
class="n">CombiningValueStateSpec&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span 
class="s1">&amp;#39;total&amp;#39;&lt;/span>&lt;span class="p">,&lt;/span> 
&lt;span class="nb">sum&lt;/span>&lt;span class="p">)&lt;/span>
+&lt;span class="k">def&lt;/span> &lt;span class="nf">process&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="bp">self&lt;/span>&lt;span 
class="p">,&lt;/span> &lt;span class="n">element&lt;/span>&lt;span 
class="p">,&lt;/span> &lt;span class="n">state&lt;/span>&lt;span 
class="o">=&lt;/span>&lt;span class="n">SoFn&lt;/span>&lt;span 
class="o">.&lt;/span>&lt;span class="n">StateParam&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="n">SUM_TOTAL&lt;/span>&lt;span 
class="p">)):&lt;/span>
+&lt;span class="n">state&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">add&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="mi">1&lt;/span>&lt;span class="p">)&lt;/span>
+&lt;span class="n">_&lt;/span> &lt;span class="o">=&lt;/span> &lt;span 
class="p">(&lt;/span>&lt;span class="n">p&lt;/span> &lt;span 
class="o">|&lt;/span> &lt;span class="s1">&amp;#39;Read per 
user&amp;#39;&lt;/span> &lt;span class="o">&amp;gt;&amp;gt;&lt;/span> &lt;span 
class="n">ReadPerUser&lt;/span>&lt;span class="p">()&lt;/span>
+&lt;span class="o">|&lt;/span> &lt;span class="s1">&amp;#39;Combine state 
pardo&amp;#39;&lt;/span> &lt;span class="o">&amp;gt;&amp;gt;&lt;/span> &lt;span 
class="n">beam&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">ParDo&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">CombiningStateDofn&lt;/span>&lt;span 
class="p">()))&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
+&lt;/div>
 &lt;h4 id="bagstate">BagState&lt;/h4>
 &lt;p>A common use case for state is to accumulate multiple elements. 
&lt;code>BagState&lt;/code> allows for accumulating an unordered set
 ofelements. This allows for addition of elements to the collection without 
requiring the reading of the entire
@@ -5346,6 +5357,18 @@ bags larger than available memory.&lt;/p>
 &lt;span class="o">}&lt;/span>
 &lt;span class="o">}));&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
 &lt;/div>
+&lt;div class=language-python>
+&lt;div class="highlight">&lt;pre class="chroma">&lt;code 
class="language-python" data-lang="python">&lt;span class="k">class&lt;/span> 
&lt;span class="nc">BagStateDoFn&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span 
class="p">):&lt;/span>
+&lt;span class="n">ALL_ELEMENTS&lt;/span> &lt;span class="o">=&lt;/span> 
&lt;span class="n">BagStateSpec&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="s1">&amp;#39;buffer&amp;#39;&lt;/span>&lt;span class="p">,&lt;/span> 
&lt;span class="n">coders&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">VarIntCoder&lt;/span>&lt;span class="p">())&lt;/span>
+&lt;span class="k">def&lt;/span> &lt;span class="nf">process&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="bp">self&lt;/span>&lt;span 
class="p">,&lt;/span> &lt;span class="n">element_pair&lt;/span>&lt;span 
class="p">,&lt;/span> &lt;span class="n">state&lt;/span>&lt;span 
class="o">=&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span 
class="o">.&lt;/span>&lt;span class="n">StateParam&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="n">ALL_ELEMENTS&lt;/span>&lt;span 
class="p" [...]
+&lt;span class="n">state&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">add&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">element_pair&lt;/span>&lt;span class="p">[&lt;/span>&lt;span 
class="mi">1&lt;/span>&lt;span class="p">])&lt;/span>
+&lt;span class="k">if&lt;/span> &lt;span 
class="n">should_fetch&lt;/span>&lt;span class="p">():&lt;/span>
+&lt;span class="n">all_elements&lt;/span> &lt;span class="o">=&lt;/span> 
&lt;span class="nb">list&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">state&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">read&lt;/span>&lt;span class="p">())&lt;/span>
+&lt;span class="n">process_values&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="n">all_elements&lt;/span>&lt;span 
class="p">)&lt;/span>
+&lt;span class="n">state&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">clear&lt;/span>&lt;span class="p">()&lt;/span>
+&lt;span class="n">_&lt;/span> &lt;span class="o">=&lt;/span> &lt;span 
class="p">(&lt;/span>&lt;span class="n">p&lt;/span> &lt;span 
class="o">|&lt;/span> &lt;span class="s1">&amp;#39;Read per 
user&amp;#39;&lt;/span> &lt;span class="o">&amp;gt;&amp;gt;&lt;/span> &lt;span 
class="n">ReadPerUser&lt;/span>&lt;span class="p">()&lt;/span>
+&lt;span class="o">|&lt;/span> &lt;span class="s1">&amp;#39;Bag state 
pardo&amp;#39;&lt;/span> &lt;span class="o">&amp;gt;&amp;gt;&lt;/span> &lt;span 
class="n">beam&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">ParDo&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">BagStateDoFn&lt;/span>&lt;span 
class="p">()))&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
+&lt;/div>
 &lt;h3 id="deferred-state-reads">11.2 Deferred state reads&lt;/h3>
 &lt;p>When a &lt;code>DoFn&lt;/code> contains multiple state specifications, 
reading each one in order can be slow. Calling the &lt;code>read()&lt;/code> 
function
 on a state can cause the runner to perform a blocking read. Performing 
multiple blocking reads in sequence adds latency
@@ -5420,12 +5443,30 @@ allows for event-time aggregations.&lt;/p>
 &lt;/span>&lt;span class="c1">&lt;/span> &lt;span class="o">}&lt;/span>
 &lt;span class="o">}));&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
 &lt;/div>
+&lt;div class=language-python>
+&lt;div class="highlight">&lt;pre class="chroma">&lt;code 
class="language-python" data-lang="python">&lt;span class="k">class&lt;/span> 
&lt;span class="nc">EventTimerDoFn&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span 
class="p">):&lt;/span>
+&lt;span class="n">ALL_ELEMENTS&lt;/span> &lt;span class="o">=&lt;/span> 
&lt;span class="n">BagStateSpec&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="s1">&amp;#39;buffer&amp;#39;&lt;/span>&lt;span class="p">,&lt;/span> 
&lt;span class="n">coders&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">VarIntCoder&lt;/span>&lt;span class="p">())&lt;/span>
+&lt;span class="n">TIMER&lt;/span> &lt;span class="o">=&lt;/span> &lt;span 
class="n">TimerSpec&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="s1">&amp;#39;timer&amp;#39;&lt;/span>&lt;span class="p">,&lt;/span> 
&lt;span class="n">TimeDomain&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">WATERMARK&lt;/span>&lt;span class="p">)&lt;/span>
+&lt;span class="k">def&lt;/span> &lt;span class="nf">process&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="bp">self&lt;/span>&lt;span 
class="p">,&lt;/span>
+&lt;span class="n">element_pair&lt;/span>&lt;span class="p">,&lt;/span>
+&lt;span class="n">t&lt;/span> &lt;span class="o">=&lt;/span> &lt;span 
class="n">DoFn&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">TimestampParam&lt;/span>&lt;span class="p">,&lt;/span>
+&lt;span class="nb">buffer&lt;/span> &lt;span class="o">=&lt;/span> &lt;span 
class="n">DoFn&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">StateParam&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">ALL_ELEMENTS&lt;/span>&lt;span class="p">),&lt;/span>
+&lt;span class="n">timer&lt;/span> &lt;span class="o">=&lt;/span> &lt;span 
class="n">DoFn&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">TimerParam&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">TIMER&lt;/span>&lt;span class="p">)):&lt;/span>
+&lt;span class="nb">buffer&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">add&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">element_pair&lt;/span>&lt;span class="p">[&lt;/span>&lt;span 
class="mi">1&lt;/span>&lt;span class="p">])&lt;/span>
+&lt;span class="c1"># Set an event-time timer to the element 
timestamp.&lt;/span>
+&lt;span class="n">timer&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">set&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">t&lt;/span>&lt;span class="p">)&lt;/span>
+&lt;span class="nd">@on_timer&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">TIMER&lt;/span>&lt;span class="p">)&lt;/span>
+&lt;span class="k">def&lt;/span> &lt;span 
class="nf">expiry_callback&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="bp">self&lt;/span>&lt;span class="p">,&lt;/span> &lt;span 
class="nb">buffer&lt;/span> &lt;span class="o">=&lt;/span> &lt;span 
class="n">DoFn&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">StateParam&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">ALL_ELEMENTS&lt;/span>&lt;span class="p">)):&lt;/span>
+&lt;span class="n">state&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">clear&lt;/span>&lt;span class="p">()&lt;/span>
+&lt;span class="n">_&lt;/span> &lt;span class="o">=&lt;/span> &lt;span 
class="p">(&lt;/span>&lt;span class="n">p&lt;/span> &lt;span 
class="o">|&lt;/span> &lt;span class="s1">&amp;#39;Read per 
user&amp;#39;&lt;/span> &lt;span class="o">&amp;gt;&amp;gt;&lt;/span> &lt;span 
class="n">ReadPerUser&lt;/span>&lt;span class="p">()&lt;/span>
+&lt;span class="o">|&lt;/span> &lt;span class="s1">&amp;#39;EventTime timer 
pardo&amp;#39;&lt;/span> &lt;span class="o">&amp;gt;&amp;gt;&lt;/span> &lt;span 
class="n">beam&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">ParDo&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">EventTimerDoFn&lt;/span>&lt;span 
class="p">()))&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
+&lt;/div>
 &lt;h4 id="processing-time-timers">11.3.2 Processing-time timers&lt;/h4>
 &lt;p>Processing-time timers fire when the real wall-clock time passes. This 
is often used to create larger batches of data
 before processing. It can also be used to schedule events that should occur at 
a specific time. Just like with
 event-time timers, processing-time timers are per key - each key has a 
separate copy of the timer.&lt;/p>
 &lt;p>While processing-time timers can be set to an absolute timestamp, it is 
very common to set them to an offset relative
-to the current time. The &lt;code>Timer.offset&lt;/code> and 
&lt;code>Timer.setRelative&lt;/code> methods can be used to accomplish 
this.&lt;/p>
+to the current time. In Java, the &lt;code>Timer.offset&lt;/code> and 
&lt;code>Timer.setRelative&lt;/code> methods can be used to accomplish 
this.&lt;/p>
 &lt;div class=language-java>
 &lt;div class="highlight">&lt;pre class="chroma">&lt;code 
class="language-java" data-lang="java">&lt;span 
class="n">PCollection&lt;/span>&lt;span class="o">&amp;lt;&lt;/span>&lt;span 
class="n">KV&lt;/span>&lt;span class="o">&amp;lt;&lt;/span>&lt;span 
class="n">String&lt;/span>&lt;span class="o">,&lt;/span> &lt;span 
class="n">ValueT&lt;/span>&lt;span class="o">&amp;gt;&amp;gt;&lt;/span> 
&lt;span class="n">perUser&lt;/span> &lt;span class="o">=&lt;/span> &lt;span 
class="n">readPerUser&lt;/ [...]
 &lt;span class="n">perUser&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="na">apply&lt;/span>&lt;span class="o">(&lt;/span>&lt;span 
class="n">ParDo&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="na">of&lt;/span>&lt;span class="o">(&lt;/span>&lt;span 
class="k">new&lt;/span> &lt;span class="n">DoFn&lt;/span>&lt;span 
class="o">&amp;lt;&lt;/span>&lt;span class="n">KV&lt;/span>&lt;span 
class="o">&amp;lt;&lt;/span>&lt;span class="n">String&lt;/span>&lt;span 
class="o">,&lt;/span [...]
@@ -5440,6 +5481,24 @@ to the current time. The &lt;code>Timer.offset&lt;/code> 
and &lt;code>Timer.setR
 &lt;/span>&lt;span class="c1">&lt;/span> &lt;span class="o">}&lt;/span>
 &lt;span class="o">}));&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
 &lt;/div>
+&lt;div class=language-python>
+&lt;div class="highlight">&lt;pre class="chroma">&lt;code 
class="language-python" data-lang="python">&lt;span class="k">class&lt;/span> 
&lt;span class="nc">ProcessingTimerDoFn&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="n">DoFn&lt;/span>&lt;span 
class="p">):&lt;/span>
+&lt;span class="n">ALL_ELEMENTS&lt;/span> &lt;span class="o">=&lt;/span> 
&lt;span class="n">BagStateSpec&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="s1">&amp;#39;buffer&amp;#39;&lt;/span>&lt;span class="p">,&lt;/span> 
&lt;span class="n">coders&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">VarIntCoder&lt;/span>&lt;span class="p">())&lt;/span>
+&lt;span class="n">TIMER&lt;/span> &lt;span class="o">=&lt;/span> &lt;span 
class="n">TimerSpec&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="s1">&amp;#39;timer&amp;#39;&lt;/span>&lt;span class="p">,&lt;/span> 
&lt;span class="n">TimeDomain&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">REAL_TIME&lt;/span>&lt;span class="p">)&lt;/span>
+&lt;span class="k">def&lt;/span> &lt;span class="nf">process&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="bp">self&lt;/span>&lt;span 
class="p">,&lt;/span>
+&lt;span class="n">element_pair&lt;/span>&lt;span class="p">,&lt;/span>
+&lt;span class="nb">buffer&lt;/span> &lt;span class="o">=&lt;/span> &lt;span 
class="n">DoFn&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">StateParam&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">ALL_ELEMENTS&lt;/span>&lt;span class="p">),&lt;/span>
+&lt;span class="n">timer&lt;/span> &lt;span class="o">=&lt;/span> &lt;span 
class="n">DoFn&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">TimerParam&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">TIMER&lt;/span>&lt;span class="p">)):&lt;/span>
+&lt;span class="nb">buffer&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">add&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">element_pair&lt;/span>&lt;span class="p">[&lt;/span>&lt;span 
class="mi">1&lt;/span>&lt;span class="p">])&lt;/span>
+&lt;span class="c1"># Set a timer to go off 30 seconds in the future.&lt;/span>
+&lt;span class="n">timer&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">set&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">Timestamp&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">now&lt;/span>&lt;span class="p">()&lt;/span> &lt;span 
class="o">+&lt;/span> &lt;span class="n">Duration&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="n">seconds&lt;/span>&lt;span 
class="o">=&lt;/span>&lt;span class="mi">30&lt;/span>&lt;span 
class="p">))&lt;/span>
+&lt;span class="nd">@on_timer&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">TIMER&lt;/span>&lt;span class="p">)&lt;/span>
+&lt;span class="k">def&lt;/span> &lt;span 
class="nf">expiry_callback&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="bp">self&lt;/span>&lt;span class="p">,&lt;/span> &lt;span 
class="nb">buffer&lt;/span> &lt;span class="o">=&lt;/span> &lt;span 
class="n">DoFn&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">StateParam&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">ALL_ELEMENTS&lt;/span>&lt;span class="p">)):&lt;/span>
+&lt;span class="c1"># Process timer.&lt;/span>
+&lt;span class="n">state&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">clear&lt;/span>&lt;span class="p">()&lt;/span>
+&lt;span class="n">_&lt;/span> &lt;span class="o">=&lt;/span> &lt;span 
class="p">(&lt;/span>&lt;span class="n">p&lt;/span> &lt;span 
class="o">|&lt;/span> &lt;span class="s1">&amp;#39;Read per 
user&amp;#39;&lt;/span> &lt;span class="o">&amp;gt;&amp;gt;&lt;/span> &lt;span 
class="n">ReadPerUser&lt;/span>&lt;span class="p">()&lt;/span>
+&lt;span class="o">|&lt;/span> &lt;span class="s1">&amp;#39;ProcessingTime 
timer pardo&amp;#39;&lt;/span> &lt;span class="o">&amp;gt;&amp;gt;&lt;/span> 
&lt;span class="n">beam&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">ParDo&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">ProcessingTimerDoFn&lt;/span>&lt;span 
class="p">()))&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
+&lt;/div>
 &lt;h4 id="dynamic-timer-tags">11.3.3 Dynamic timer tags&lt;/h4>
 &lt;p>Beam also supports dynamically setting a timer tag using 
&lt;code>TimerMap&lt;/code>. This allows for setting multiple different timers
 in a &lt;code>DoFn&lt;/code> and allowing for the timer tags to be dynamically 
chosen - e.g. based on data in the input elements. A
@@ -5462,6 +5521,9 @@ id, and timers in different timer families are 
independent.&lt;/p>
 &lt;span class="o">}&lt;/span>
 &lt;span class="o">}));&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
 &lt;/div>
+&lt;div class=language-python>
+&lt;div class="highlight">&lt;pre class="chroma">&lt;code 
class="language-python" data-lang="python">&lt;span class="n">To&lt;/span> 
&lt;span class="n">be&lt;/span> &lt;span class="n">supported&lt;/span>&lt;span 
class="p">,&lt;/span> &lt;span class="n">See&lt;/span> &lt;span 
class="n">BEAM&lt;/span>&lt;span class="o">-&lt;/span>&lt;span 
class="mi">9602&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
+&lt;/div>
 &lt;h4 id="timer-output-timestamps">11.3.4 Timer output timestamps&lt;/h4>
 &lt;p>By default, event-time timers will hold the output watermark of the 
&lt;code>ParDo&lt;/code> to the timestamp of the timer. This means
 that if a timer is set to 12pm, any windowed aggregations or event-time timers 
later in the pipeline graph that finish&lt;br>
@@ -5560,7 +5622,7 @@ performance. There are two common strategies for garbage 
collecting state.&lt;/p
 &lt;p>All state and timers for a key is scoped to the window it is in. This 
means that depending on the timestamp of the
 input element the ParDo will see different values for the state depending on 
the window that element falls into. In
 addition, once the input watermark passes the end of the window, the runner 
should garbage collect all state for that
-window. (note: if allowed lateness is set to a positive value for the window, 
the runner must wait for the watemark to
+window. (note: if allowed lateness is set to a positive value for the window, 
the runner must wait for the watermark to
 pass the end of the window plus the allowed lateness before garbage collecting 
state). This can be used as a
 garbage-collection strategy.&lt;/p>
 &lt;p>For example, given the following:&lt;/p>
@@ -5577,6 +5639,17 @@ garbage-collection strategy.&lt;/p>
 &lt;/span>&lt;span class="c1">&lt;/span> &lt;span class="o">}&lt;/span>
 &lt;span class="o">}));&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
 &lt;/div>
+&lt;div class=language-python>
+&lt;div class="highlight">&lt;pre class="chroma">&lt;code 
class="language-python" data-lang="python">&lt;span class="k">class&lt;/span> 
&lt;span class="nc">StateDoFn&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">DoFn&lt;/span>&lt;span class="p">):&lt;/span>
+&lt;span class="n">ALL_ELEMENTS&lt;/span> &lt;span class="o">=&lt;/span> 
&lt;span class="n">BagStateSpec&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="s1">&amp;#39;buffer&amp;#39;&lt;/span>&lt;span class="p">,&lt;/span> 
&lt;span class="n">coders&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">VarIntCoder&lt;/span>&lt;span class="p">())&lt;/span>
+&lt;span class="k">def&lt;/span> &lt;span class="nf">process&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="bp">self&lt;/span>&lt;span 
class="p">,&lt;/span>
+&lt;span class="n">element_pair&lt;/span>&lt;span class="p">,&lt;/span>
+&lt;span class="nb">buffer&lt;/span> &lt;span class="o">=&lt;/span> &lt;span 
class="n">DoFn&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">StateParam&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">ALL_ELEMENTS&lt;/span>&lt;span class="p">)):&lt;/span>
+&lt;span class="o">...&lt;/span>
+&lt;span class="n">_&lt;/span> &lt;span class="o">=&lt;/span> &lt;span 
class="p">(&lt;/span>&lt;span class="n">p&lt;/span> &lt;span 
class="o">|&lt;/span> &lt;span class="s1">&amp;#39;Read per 
user&amp;#39;&lt;/span> &lt;span class="o">&amp;gt;&amp;gt;&lt;/span> &lt;span 
class="n">ReadPerUser&lt;/span>&lt;span class="p">()&lt;/span>
+&lt;span class="o">|&lt;/span> &lt;span 
class="s1">&amp;#39;Windowing&amp;#39;&lt;/span> &lt;span 
class="o">&amp;gt;&amp;gt;&lt;/span> &lt;span class="n">beam&lt;/span>&lt;span 
class="o">.&lt;/span>&lt;span class="n">WindowInto&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="n">FixedWindows&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="mi">60&lt;/span> &lt;span 
class="o">*&lt;/span> &lt;span class="mi">60&lt;/span> &lt;span 
class="o">*&lt;/span> &lt;span class="mi">24&lt; [...]
+&lt;span class="o">|&lt;/span> &lt;span 
class="s1">&amp;#39;DoFn&amp;#39;&lt;/span> &lt;span 
class="o">&amp;gt;&amp;gt;&lt;/span> &lt;span class="n">beam&lt;/span>&lt;span 
class="o">.&lt;/span>&lt;span class="n">ParDo&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="n">StateDoFn&lt;/span>&lt;span 
class="p">()))&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
+&lt;/div>
 &lt;p>This &lt;code>ParDo&lt;/code> stores state per day. Once the pipeline is 
done processing data for a given day, all the state for that
 day is garbage collected.&lt;/p>
 &lt;h5 id="using-timers-for-garbage-collection">11.4.1 &lt;strong>Using timers 
For garbage collection&lt;/strong>&lt;/h5>
@@ -5615,6 +5688,33 @@ This can be done by updating a timer that garbage 
collects state. For example&lt
 &lt;span class="o">}&lt;/span>
 &lt;span class="o">}&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
 &lt;/div>
+&lt;div class=language-python>
+&lt;div class="highlight">&lt;pre class="chroma">&lt;code 
class="language-python" data-lang="python">&lt;span class="k">class&lt;/span> 
&lt;span class="nc">UserDoFn&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">DoFn&lt;/span>&lt;span class="p">):&lt;/span>
+&lt;span class="n">ALL_ELEMENTS&lt;/span> &lt;span class="o">=&lt;/span> 
&lt;span class="n">BagStateSpec&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="s1">&amp;#39;state&amp;#39;&lt;/span>&lt;span class="p">,&lt;/span> 
&lt;span class="n">coders&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">VarIntCoder&lt;/span>&lt;span class="p">())&lt;/span>
+&lt;span class="n">MAX_TIMESTAMP&lt;/span> &lt;span class="o">=&lt;/span> 
&lt;span class="n">CombiningValueStateSpec&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span 
class="s1">&amp;#39;max_timestamp_seen&amp;#39;&lt;/span>&lt;span 
class="p">,&lt;/span> &lt;span class="nb">max&lt;/span>&lt;span 
class="p">)&lt;/span>
+&lt;span class="n">TIMER&lt;/span> &lt;span class="o">=&lt;/span> &lt;span 
class="n">TimerSpec&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="s1">&amp;#39;gc-timer&amp;#39;&lt;/span>&lt;span class="p">,&lt;/span> 
&lt;span class="n">TimeDomain&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">WATERMARK&lt;/span>&lt;span class="p">)&lt;/span>
+&lt;span class="k">def&lt;/span> &lt;span class="nf">process&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="bp">self&lt;/span>&lt;span 
class="p">,&lt;/span>
+&lt;span class="n">element&lt;/span>&lt;span class="p">,&lt;/span>
+&lt;span class="n">t&lt;/span> &lt;span class="o">=&lt;/span> &lt;span 
class="n">DoFn&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">TimestampParam&lt;/span>&lt;span class="p">,&lt;/span>
+&lt;span class="n">state&lt;/span> &lt;span class="o">=&lt;/span> &lt;span 
class="n">DoFn&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">StateParam&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">ALL_ELEMENTS&lt;/span>&lt;span class="p">),&lt;/span>
+&lt;span class="n">max_timestamp&lt;/span> &lt;span class="o">=&lt;/span> 
&lt;span class="n">DoFn&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">StateParam&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">MAX_TIMESTAMP&lt;/span>&lt;span class="p">),&lt;/span>
+&lt;span class="n">timer&lt;/span> &lt;span class="o">=&lt;/span> &lt;span 
class="n">DoFn&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">TimerParam&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">TIMER&lt;/span>&lt;span class="p">)):&lt;/span>
+&lt;span class="n">update_state&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="n">state&lt;/span>&lt;span 
class="p">,&lt;/span> &lt;span class="n">element&lt;/span>&lt;span 
class="p">)&lt;/span>
+&lt;span class="n">max_timestamp&lt;/span>&lt;span 
class="o">.&lt;/span>&lt;span class="n">add&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="n">t&lt;/span>&lt;span 
class="o">.&lt;/span>&lt;span class="n">micros&lt;/span>&lt;span 
class="p">)&lt;/span>
+&lt;span class="c1"># Set the timer to be one hour after the maximum timestamp 
seen. This will keep overwriting the same timer, so &lt;/span>
+&lt;span class="c1"># as long as there is activity on this key the state will 
stay active. Once the key goes inactive for one hour&amp;#39;s&lt;/span>
+&lt;span class="c1"># worth of event time (as measured by the watermark), then 
the gc timer will fire.&lt;/span>
+&lt;span class="n">expiration_time&lt;/span> &lt;span class="o">=&lt;/span> 
&lt;span class="n">Timestamp&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">micros&lt;/span>&lt;span class="o">=&lt;/span>&lt;span 
class="n">max_timestamp&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">read&lt;/span>&lt;span class="p">())&lt;/span> &lt;span 
class="o">+&lt;/span> &lt;span class="n">Duration&lt;/span>&lt;span 
class="p">(&lt;/span>&lt;span class="n">seconds&lt;/span>&lt;span cl [...]
+&lt;span class="n">timer&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">set&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">expiration_time&lt;/span>&lt;span class="p">)&lt;/span>
+&lt;span class="nd">@on_timer&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">TIMER&lt;/span>&lt;span class="p">)&lt;/span>
+&lt;span class="k">def&lt;/span> &lt;span 
class="nf">expiry_callback&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="bp">self&lt;/span>&lt;span class="p">,&lt;/span>
+&lt;span class="n">state&lt;/span> &lt;span class="o">=&lt;/span> &lt;span 
class="n">DoFn&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">StateParam&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">ALL_ELEMENTS&lt;/span>&lt;span class="p">),&lt;/span>
+&lt;span class="n">max_timestamp&lt;/span> &lt;span class="o">=&lt;/span> 
&lt;span class="n">DoFn&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">StateParam&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">MAX_TIMESTAMP&lt;/span>&lt;span class="p">)):&lt;/span>
+&lt;span class="n">state&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">clear&lt;/span>&lt;span class="p">()&lt;/span>
+&lt;span class="n">max_timestamp&lt;/span>&lt;span 
class="o">.&lt;/span>&lt;span class="n">clear&lt;/span>&lt;span 
class="p">()&lt;/span>
+&lt;span class="n">_&lt;/span> &lt;span class="o">=&lt;/span> &lt;span 
class="p">(&lt;/span>&lt;span class="n">p&lt;/span> &lt;span 
class="o">|&lt;/span> &lt;span class="s1">&amp;#39;Read per 
user&amp;#39;&lt;/span> &lt;span class="o">&amp;gt;&amp;gt;&lt;/span> &lt;span 
class="n">ReadPerUser&lt;/span>&lt;span class="p">()&lt;/span>
+&lt;span class="o">|&lt;/span> &lt;span class="s1">&amp;#39;User 
DoFn&amp;#39;&lt;/span> &lt;span class="o">&amp;gt;&amp;gt;&lt;/span> &lt;span 
class="n">beam&lt;/span>&lt;span class="o">.&lt;/span>&lt;span 
class="n">ParDo&lt;/span>&lt;span class="p">(&lt;/span>&lt;span 
class="n">UserDoFn&lt;/span>&lt;span 
class="p">()))&lt;/span>&lt;/code>&lt;/pre>&lt;/div>
+&lt;/div>
 &lt;h3 id="state-timers-examples">11.5 State and timers examples&lt;/h3>
 &lt;p>Following are some example uses of state and timers&lt;/p>
 &lt;h4 id="joining-clicks-and-views">11.5.1. Joining clicks and views&lt;/h4>
diff --git 
a/website/generated-content/documentation/programming-guide/index.html 
b/website/generated-content/documentation/programming-guide/index.html
index 39612c3..d24661c 100644
--- a/website/generated-content/documentation/programming-guide/index.html
+++ b/website/generated-content/documentation/programming-guide/index.html
@@ -2223,13 +2223,15 @@ is modeled as a <code>PCollection&lt;KV&lt;K, 
V>></code>. A <code>ParDo</code> p
 the <code>ParDo</code> these state variables can be used to write or update 
state for the current key or to read previous state
 written for that key. State is always fully scoped only to the current 
processing key.</p><p>Windowing can still be used together with stateful 
processing. All state for a key is scoped to the current window. This
 means that the first time a key is seen for a given window any state reads 
will return empty, and that a runner can
-garbage collect state when a window is completed. It&rsquo;s also often useful 
to use Beam&rsquo;s windowed aggegations prior to
+garbage collect state when a window is completed. It&rsquo;s also often useful 
to use Beam&rsquo;s windowed aggregations prior to
 the stateful operator. For example, using a combiner to preaggregate data, and 
then storing aggregated data inside of
 state. Merging windows are not currently supported when using state and 
timers.</p><p>Sometimes stateful processing is used to implement state-machine 
style processing inside a <code>DoFn</code>. When doing this,
 care must be taken to remember that the elements in input PCollection have no 
guaranteed order and to ensure that the
 program logic is resilient to this. Unit tests written using the DirectRunner 
will shuffle the order of element
-processing, and are recommended to test for correctness.</p><p>In Java DoFn 
declares states to be accessed by creating final <code>StateSpec</code> member 
variables representing each state. Each
+processing, and are recommended to test for correctness.</p><p 
class=language-java>In Java DoFn declares states to be accessed by creating 
final <code>StateSpec</code> member variables representing each state. Each
 state must be named using the <code>StateId</code> annotation; this name is 
unique to a ParDo in the graph and has no relation
+to other nodes in the graph. A <code>DoFn</code> can declare multiple state 
variables.</p><p class=language-py>In Python DoFn declares states to be 
accessed by creating <code>StateSpec</code> class member variables representing 
each state. Each
+<code>StateSpec</code> is initialized with a name, this name is unique to a 
ParDo in the graph and has no relation
 to other nodes in the graph. A <code>DoFn</code> can declare multiple state 
variables.</p><h3 id=types-of-state>11.1 Types of state</h3><p>Beam provides 
several types of state:</p><h4 id=valuestate>ValueState</h4><p>A ValueState is 
a scalar state value. For each key in the input, a ValueState will store a 
typed value that can be
 read and modified inside the DoFn&rsquo;s <code>@ProcessElement</code> or 
<code>@OnTimer</code> methods. If the type of the ValueState has a coder
 registered, then Beam will automatically infer the coder for the state value. 
Otherwise, a coder can be explicitly
@@ -2258,7 +2260,14 @@ accumulates the number of elements seen.</p><div 
class=language-java><div class=
   <span class=nd>@ProcessElement</span> <span class=kd>public</span> <span 
class=kt>void</span> <span class=nf>process</span><span class=o>(</span><span 
class=nd>@StateId</span><span class=o>(</span><span 
class=s>&#34;state&#34;</span><span class=o>)</span> <span 
class=n>ValueState</span><span class=o>&lt;</span><span 
class=n>Integer</span><span class=o>&gt;</span> <span class=n>state</span><span 
class=o>)</span> <span class=o>{</span>
     <span class=n>state</span><span class=o>.</span><span 
class=na>add</span><span class=o>(</span><span class=n>1</span><span 
class=o>);</span>
   <span class=o>}</span>
-<span class=o>}));</span></code></pre></div></div><h4 
id=bagstate>BagState</h4><p>A common use case for state is to accumulate 
multiple elements. <code>BagState</code> allows for accumulating an unordered 
set
+<span class=o>}));</span></code></pre></div></div><div 
class=language-python><div class=highlight><pre class=chroma><code 
class=language-python data-lang=python><span class=k>class</span> <span 
class=nc>CombiningStateDoFn</span><span class=p>(</span><span 
class=n>DoFn</span><span class=p>):</span>
+  <span class=n>SUM_TOTAL</span> <span class=o>=</span> <span 
class=n>CombiningValueStateSpec</span><span class=p>(</span><span 
class=s1>&#39;total&#39;</span><span class=p>,</span> <span 
class=nb>sum</span><span class=p>)</span>
+  
+  <span class=k>def</span> <span class=nf>process</span><span 
class=p>(</span><span class=bp>self</span><span class=p>,</span> <span 
class=n>element</span><span class=p>,</span> <span class=n>state</span><span 
class=o>=</span><span class=n>SoFn</span><span class=o>.</span><span 
class=n>StateParam</span><span class=p>(</span><span 
class=n>SUM_TOTAL</span><span class=p>)):</span>
+    <span class=n>state</span><span class=o>.</span><span 
class=n>add</span><span class=p>(</span><span class=mi>1</span><span 
class=p>)</span>
+    
+<span class=n>_</span> <span class=o>=</span> <span class=p>(</span><span 
class=n>p</span> <span class=o>|</span> <span class=s1>&#39;Read per 
user&#39;</span> <span class=o>&gt;&gt;</span> <span 
class=n>ReadPerUser</span><span class=p>()</span>
+       <span class=o>|</span> <span class=s1>&#39;Combine state 
pardo&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>beam</span><span 
class=o>.</span><span class=n>ParDo</span><span class=p>(</span><span 
class=n>CombiningStateDofn</span><span 
class=p>()))</span></code></pre></div></div><h4 id=bagstate>BagState</h4><p>A 
common use case for state is to accumulate multiple elements. 
<code>BagState</code> allows for accumulating an unordered set
 ofelements. This allows for addition of elements to the collection without 
requiring the reading of the entire
 collection first, which is an efficiency gain. In addition, runners that 
support paged reads can allow individual
 bags larger than available memory.</p><div class=language-java><div 
class=highlight><pre class=chroma><code class=language-java 
data-lang=java><span class=n>PCollection</span><span class=o>&lt;</span><span 
class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span 
class=o>,</span> <span class=n>ValueT</span><span class=o>&gt;&gt;</span> <span 
class=n>perUser</span> <span class=o>=</span> <span 
class=n>readPerUser</span><span class=o>();</span>
@@ -2277,7 +2286,18 @@ bags larger than available memory.</p><div 
class=language-java><div class=highli
       <span class=n>state</span><span class=o>.</span><span 
class=na>clear</span><span class=o>();</span>  <span class=c1>// Clear the 
state for this key.
 </span><span class=c1></span>    <span class=o>}</span>
   <span class=o>}</span>
-<span class=o>}));</span></code></pre></div></div><h3 
id=deferred-state-reads>11.2 Deferred state reads</h3><p>When a 
<code>DoFn</code> contains multiple state specifications, reading each one in 
order can be slow. Calling the <code>read()</code> function
+<span class=o>}));</span></code></pre></div></div><div 
class=language-python><div class=highlight><pre class=chroma><code 
class=language-python data-lang=python><span class=k>class</span> <span 
class=nc>BagStateDoFn</span><span class=p>(</span><span 
class=n>DoFn</span><span class=p>):</span>
+  <span class=n>ALL_ELEMENTS</span> <span class=o>=</span> <span 
class=n>BagStateSpec</span><span class=p>(</span><span 
class=s1>&#39;buffer&#39;</span><span class=p>,</span> <span 
class=n>coders</span><span class=o>.</span><span 
class=n>VarIntCoder</span><span class=p>())</span>
+  
+  <span class=k>def</span> <span class=nf>process</span><span 
class=p>(</span><span class=bp>self</span><span class=p>,</span> <span 
class=n>element_pair</span><span class=p>,</span> <span 
class=n>state</span><span class=o>=</span><span class=n>DoFn</span><span 
class=o>.</span><span class=n>StateParam</span><span class=p>(</span><span 
class=n>ALL_ELEMENTS</span><span class=p>)):</span>
+    <span class=n>state</span><span class=o>.</span><span 
class=n>add</span><span class=p>(</span><span class=n>element_pair</span><span 
class=p>[</span><span class=mi>1</span><span class=p>])</span>
+    <span class=k>if</span> <span class=n>should_fetch</span><span 
class=p>():</span>
+      <span class=n>all_elements</span> <span class=o>=</span> <span 
class=nb>list</span><span class=p>(</span><span class=n>state</span><span 
class=o>.</span><span class=n>read</span><span class=p>())</span>
+      <span class=n>process_values</span><span class=p>(</span><span 
class=n>all_elements</span><span class=p>)</span>
+      <span class=n>state</span><span class=o>.</span><span 
class=n>clear</span><span class=p>()</span>
+    
+<span class=n>_</span> <span class=o>=</span> <span class=p>(</span><span 
class=n>p</span> <span class=o>|</span> <span class=s1>&#39;Read per 
user&#39;</span> <span class=o>&gt;&gt;</span> <span 
class=n>ReadPerUser</span><span class=p>()</span>
+       <span class=o>|</span> <span class=s1>&#39;Bag state pardo&#39;</span> 
<span class=o>&gt;&gt;</span> <span class=n>beam</span><span 
class=o>.</span><span class=n>ParDo</span><span class=p>(</span><span 
class=n>BagStateDoFn</span><span 
class=p>()))</span></code></pre></div></div><h3 id=deferred-state-reads>11.2 
Deferred state reads</h3><p>When a <code>DoFn</code> contains multiple state 
specifications, reading each one in order can be slow. Calling the 
<code>read()</code> function
 on a state can cause the runner to perform a blocking read. Performing 
multiple blocking reads in sequence adds latency
 to element processing. If you know that a state will always be read, you can 
annotate it as @AlwaysFetched, and then the
 runner can prefetch all of the states necessary. For example:</p><div 
class=language-java><div class=highlight><pre class=chroma><code 
class=language-java data-lang=java><span class=n>PCollection</span><span 
class=o>&lt;</span><span class=n>KV</span><span class=o>&lt;</span><span 
class=n>String</span><span class=o>,</span> <span class=n>ValueT</span><span 
class=o>&gt;&gt;</span> <span class=n>perUser</span> <span class=o>=</span> 
<span class=n>readPerUser</span><span class=o>();</span>
@@ -2313,7 +2333,7 @@ be read in the future, allowing multiple state reads to 
be batched together.</p>
     <span class=o>}</span>
    
     <span class=c1>// The runner can now batch all three states into a single 
read, reducing latency.
-</span><span class=c1></span>     <span class=n>processState1</span><span 
class=o>(</span><span class=n>state1</span><span class=o>.</span><span 
class=na>read</span><span class=o>());</span>
+</span><span class=c1></span>    <span class=n>processState1</span><span 
class=o>(</span><span class=n>state1</span><span class=o>.</span><span 
class=na>read</span><span class=o>());</span>
     <span class=n>processState2</span><span class=o>(</span><span 
class=n>state2</span><span class=o>.</span><span class=na>read</span><span 
class=o>());</span>
     <span class=n>processState3</span><span class=o>(</span><span 
class=n>state3</span><span class=o>.</span><span class=na>read</span><span 
class=o>());</span>
   <span class=o>}</span>
@@ -2340,10 +2360,28 @@ allows for event-time aggregations.</p><div 
class=language-java><div class=highl
    <span class=nd>@OnTimer</span><span class=o>(</span><span 
class=s>&#34;timer&#34;</span><span class=o>)</span> <span 
class=kd>public</span> <span class=kt>void</span> <span 
class=nf>onTimer</span><span class=o>()</span> <span class=o>{</span>
       <span class=c1>//Process timer.
 </span><span class=c1></span>   <span class=o>}</span>
-<span class=o>}));</span></code></pre></div></div><h4 
id=processing-time-timers>11.3.2 Processing-time timers</h4><p>Processing-time 
timers fire when the real wall-clock time passes. This is often used to create 
larger batches of data
+<span class=o>}));</span></code></pre></div></div><div 
class=language-python><div class=highlight><pre class=chroma><code 
class=language-python data-lang=python><span class=k>class</span> <span 
class=nc>EventTimerDoFn</span><span class=p>(</span><span 
class=n>DoFn</span><span class=p>):</span>
+  <span class=n>ALL_ELEMENTS</span> <span class=o>=</span> <span 
class=n>BagStateSpec</span><span class=p>(</span><span 
class=s1>&#39;buffer&#39;</span><span class=p>,</span> <span 
class=n>coders</span><span class=o>.</span><span 
class=n>VarIntCoder</span><span class=p>())</span>
+  <span class=n>TIMER</span> <span class=o>=</span> <span 
class=n>TimerSpec</span><span class=p>(</span><span 
class=s1>&#39;timer&#39;</span><span class=p>,</span> <span 
class=n>TimeDomain</span><span class=o>.</span><span 
class=n>WATERMARK</span><span class=p>)</span>
+  
+  <span class=k>def</span> <span class=nf>process</span><span 
class=p>(</span><span class=bp>self</span><span class=p>,</span> 
+              <span class=n>element_pair</span><span class=p>,</span> 
+              <span class=n>t</span> <span class=o>=</span> <span 
class=n>DoFn</span><span class=o>.</span><span 
class=n>TimestampParam</span><span class=p>,</span>
+              <span class=nb>buffer</span> <span class=o>=</span> <span 
class=n>DoFn</span><span class=o>.</span><span class=n>StateParam</span><span 
class=p>(</span><span class=n>ALL_ELEMENTS</span><span class=p>),</span> 
+              <span class=n>timer</span> <span class=o>=</span> <span 
class=n>DoFn</span><span class=o>.</span><span class=n>TimerParam</span><span 
class=p>(</span><span class=n>TIMER</span><span class=p>)):</span>
+    <span class=nb>buffer</span><span class=o>.</span><span 
class=n>add</span><span class=p>(</span><span class=n>element_pair</span><span 
class=p>[</span><span class=mi>1</span><span class=p>])</span>
+    <span class=c1># Set an event-time timer to the element timestamp.</span>
+    <span class=n>timer</span><span class=o>.</span><span 
class=n>set</span><span class=p>(</span><span class=n>t</span><span 
class=p>)</span>
+  
+  <span class=nd>@on_timer</span><span class=p>(</span><span 
class=n>TIMER</span><span class=p>)</span>
+  <span class=k>def</span> <span class=nf>expiry_callback</span><span 
class=p>(</span><span class=bp>self</span><span class=p>,</span> <span 
class=nb>buffer</span> <span class=o>=</span> <span class=n>DoFn</span><span 
class=o>.</span><span class=n>StateParam</span><span class=p>(</span><span 
class=n>ALL_ELEMENTS</span><span class=p>)):</span>
+    <span class=n>state</span><span class=o>.</span><span 
class=n>clear</span><span class=p>()</span>
+    
+<span class=n>_</span> <span class=o>=</span> <span class=p>(</span><span 
class=n>p</span> <span class=o>|</span> <span class=s1>&#39;Read per 
user&#39;</span> <span class=o>&gt;&gt;</span> <span 
class=n>ReadPerUser</span><span class=p>()</span>
+       <span class=o>|</span> <span class=s1>&#39;EventTime timer 
pardo&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>beam</span><span 
class=o>.</span><span class=n>ParDo</span><span class=p>(</span><span 
class=n>EventTimerDoFn</span><span 
class=p>()))</span></code></pre></div></div><h4 
id=processing-time-timers>11.3.2 Processing-time timers</h4><p>Processing-time 
timers fire when the real wall-clock time passes. This is often used to create 
larger batches of data
 before processing. It can also be used to schedule events that should occur at 
a specific time. Just like with
 event-time timers, processing-time timers are per key - each key has a 
separate copy of the timer.</p><p>While processing-time timers can be set to an 
absolute timestamp, it is very common to set them to an offset relative
-to the current time. The <code>Timer.offset</code> and 
<code>Timer.setRelative</code> methods can be used to accomplish this.</p><div 
class=language-java><div class=highlight><pre class=chroma><code 
class=language-java data-lang=java><span class=n>PCollection</span><span 
class=o>&lt;</span><span class=n>KV</span><span class=o>&lt;</span><span 
class=n>String</span><span class=o>,</span> <span class=n>ValueT</span><span 
class=o>&gt;&gt;</span> <span class=n>perUser</span> <span class=o>=</ [...]
+to the current time. In Java, the <code>Timer.offset</code> and 
<code>Timer.setRelative</code> methods can be used to accomplish this.</p><div 
class=language-java><div class=highlight><pre class=chroma><code 
class=language-java data-lang=java><span class=n>PCollection</span><span 
class=o>&lt;</span><span class=n>KV</span><span class=o>&lt;</span><span 
class=n>String</span><span class=o>,</span> <span class=n>ValueT</span><span 
class=o>&gt;&gt;</span> <span class=n>perUser</span> <span cl [...]
 <span class=n>perUser</span><span class=o>.</span><span 
class=na>apply</span><span class=o>(</span><span class=n>ParDo</span><span 
class=o>.</span><span class=na>of</span><span class=o>(</span><span 
class=k>new</span> <span class=n>DoFn</span><span class=o>&lt;</span><span 
class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span 
class=o>,</span> <span class=n>ValueT</span><span class=o>&gt;,</span> <span 
class=n>OutputT</span><span class=o>&gt;()</span> <span class=o>{</span>
   <span class=nd>@TimerId</span><span class=o>(</span><span 
class=s>&#34;timer&#34;</span><span class=o>)</span> <span 
class=kd>private</span> <span class=kd>final</span> <span 
class=n>TimerSpec</span> <span class=n>timer</span> <span class=o>=</span> 
<span class=n>TimerSpecs</span><span class=o>.</span><span 
class=na>timer</span><span class=o>(</span><span class=n>TimeDomain</span><span 
class=o>.</span><span class=na>PROCESSING_TIME</span><span class=o>);</span>
 
@@ -2356,7 +2394,25 @@ to the current time. The <code>Timer.offset</code> and 
<code>Timer.setRelative</
    <span class=nd>@OnTimer</span><span class=o>(</span><span 
class=s>&#34;timer&#34;</span><span class=o>)</span> <span 
class=kd>public</span> <span class=kt>void</span> <span 
class=nf>onTimer</span><span class=o>()</span> <span class=o>{</span>
       <span class=c1>//Process timer.
 </span><span class=c1></span>   <span class=o>}</span>
-<span class=o>}));</span></code></pre></div></div><h4 
id=dynamic-timer-tags>11.3.3 Dynamic timer tags</h4><p>Beam also supports 
dynamically setting a timer tag using <code>TimerMap</code>. This allows for 
setting multiple different timers
+<span class=o>}));</span></code></pre></div></div><div 
class=language-python><div class=highlight><pre class=chroma><code 
class=language-python data-lang=python><span class=k>class</span> <span 
class=nc>ProcessingTimerDoFn</span><span class=p>(</span><span 
class=n>DoFn</span><span class=p>):</span>
+  <span class=n>ALL_ELEMENTS</span> <span class=o>=</span> <span 
class=n>BagStateSpec</span><span class=p>(</span><span 
class=s1>&#39;buffer&#39;</span><span class=p>,</span> <span 
class=n>coders</span><span class=o>.</span><span 
class=n>VarIntCoder</span><span class=p>())</span>
+  <span class=n>TIMER</span> <span class=o>=</span> <span 
class=n>TimerSpec</span><span class=p>(</span><span 
class=s1>&#39;timer&#39;</span><span class=p>,</span> <span 
class=n>TimeDomain</span><span class=o>.</span><span 
class=n>REAL_TIME</span><span class=p>)</span>
+  
+  <span class=k>def</span> <span class=nf>process</span><span 
class=p>(</span><span class=bp>self</span><span class=p>,</span> 
+              <span class=n>element_pair</span><span class=p>,</span> 
+              <span class=nb>buffer</span> <span class=o>=</span> <span 
class=n>DoFn</span><span class=o>.</span><span class=n>StateParam</span><span 
class=p>(</span><span class=n>ALL_ELEMENTS</span><span class=p>),</span> 
+              <span class=n>timer</span> <span class=o>=</span> <span 
class=n>DoFn</span><span class=o>.</span><span class=n>TimerParam</span><span 
class=p>(</span><span class=n>TIMER</span><span class=p>)):</span>
+    <span class=nb>buffer</span><span class=o>.</span><span 
class=n>add</span><span class=p>(</span><span class=n>element_pair</span><span 
class=p>[</span><span class=mi>1</span><span class=p>])</span>
+    <span class=c1># Set a timer to go off 30 seconds in the future.</span>
+    <span class=n>timer</span><span class=o>.</span><span 
class=n>set</span><span class=p>(</span><span class=n>Timestamp</span><span 
class=o>.</span><span class=n>now</span><span class=p>()</span> <span 
class=o>+</span> <span class=n>Duration</span><span class=p>(</span><span 
class=n>seconds</span><span class=o>=</span><span class=mi>30</span><span 
class=p>))</span>
+  
+  <span class=nd>@on_timer</span><span class=p>(</span><span 
class=n>TIMER</span><span class=p>)</span>
+  <span class=k>def</span> <span class=nf>expiry_callback</span><span 
class=p>(</span><span class=bp>self</span><span class=p>,</span> <span 
class=nb>buffer</span> <span class=o>=</span> <span class=n>DoFn</span><span 
class=o>.</span><span class=n>StateParam</span><span class=p>(</span><span 
class=n>ALL_ELEMENTS</span><span class=p>)):</span>
+    <span class=c1># Process timer.</span>
+    <span class=n>state</span><span class=o>.</span><span 
class=n>clear</span><span class=p>()</span>
+    
+<span class=n>_</span> <span class=o>=</span> <span class=p>(</span><span 
class=n>p</span> <span class=o>|</span> <span class=s1>&#39;Read per 
user&#39;</span> <span class=o>&gt;&gt;</span> <span 
class=n>ReadPerUser</span><span class=p>()</span>
+       <span class=o>|</span> <span class=s1>&#39;ProcessingTime timer 
pardo&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>beam</span><span 
class=o>.</span><span class=n>ParDo</span><span class=p>(</span><span 
class=n>ProcessingTimerDoFn</span><span 
class=p>()))</span></code></pre></div></div><h4 id=dynamic-timer-tags>11.3.3 
Dynamic timer tags</h4><p>Beam also supports dynamically setting a timer tag 
using <code>TimerMap</code>. This allows for setting multiple different timers
 in a <code>DoFn</code> and allowing for the timer tags to be dynamically 
chosen - e.g. based on data in the input elements. A
 timer with a specific tag can only be set to a single timestamp, so setting 
the timer again has the effect of
 overwriting the previous expiration time for the timer with that tag. Each 
<code>TimerMap</code> is identified with a timer family
@@ -2375,7 +2431,7 @@ id, and timers in different timer families are 
independent.</p><div class=langua
    <span class=nd>@OnTimerFamily</span><span class=o>(</span><span 
class=s>&#34;actionTimers&#34;</span><span class=o>)</span> <span 
class=kd>public</span> <span class=kt>void</span> <span 
class=nf>onTimer</span><span class=o>(</span><span class=nd>@TimerId</span> 
<span class=n>String</span> <span class=n>timerId</span><span class=o>)</span> 
<span class=o>{</span>
      <span class=n>LOG</span><span class=o>.</span><span 
class=na>info</span><span class=o>(</span><span class=s>&#34;Timer fired with 
id &#34;</span> <span class=o>+</span> <span class=n>timerId</span><span 
class=o>);</span>
    <span class=o>}</span>
-<span class=o>}));</span></code></pre></div></div><h4 
id=timer-output-timestamps>11.3.4 Timer output timestamps</h4><p>By default, 
event-time timers will hold the output watermark of the <code>ParDo</code> to 
the timestamp of the timer. This means
+<span class=o>}));</span></code></pre></div></div><div 
class=language-python><div class=highlight><pre class=chroma><code 
class=language-python data-lang=python><span class=n>To</span> <span 
class=n>be</span> <span class=n>supported</span><span class=p>,</span> <span 
class=n>See</span> <span class=n>BEAM</span><span class=o>-</span><span 
class=mi>9602</span></code></pre></div></div><h4 
id=timer-output-timestamps>11.3.4 Timer output timestamps</h4><p>By default, 
event-time timers will hol [...]
 that if a timer is set to 12pm, any windowed aggregations or event-time timers 
later in the pipeline graph that finish<br>after 12pm will not expire. The 
timestamp of the timer is also the default output timestamp for the timer 
callback. This
 means that any elements output from the onTimer method will have a timestamp 
equal to the timestamp of the timer firing.
 For processing-time timers, the default output timestamp and watermark hold is 
the value of the input watermark at the
@@ -2465,7 +2521,7 @@ past the timestamp of the minimum element. The following 
code demonstrates this.
 performance. There are two common strategies for garbage collecting 
state.</p><h5 id=using-windows-for-garbage-collection>11.4.1 <strong>Using 
windows for garbage collection</strong></h5><p>All state and timers for a key 
is scoped to the window it is in. This means that depending on the timestamp of 
the
 input element the ParDo will see different values for the state depending on 
the window that element falls into. In
 addition, once the input watermark passes the end of the window, the runner 
should garbage collect all state for that
-window. (note: if allowed lateness is set to a positive value for the window, 
the runner must wait for the watemark to
+window. (note: if allowed lateness is set to a positive value for the window, 
the runner must wait for the watermark to
 pass the end of the window plus the allowed lateness before garbage collecting 
state). This can be used as a
 garbage-collection strategy.</p><p>For example, given the following:</p><div 
class=language-java><div class=highlight><pre class=chroma><code 
class=language-java data-lang=java><span class=n>PCollection</span><span 
class=o>&lt;</span><span class=n>KV</span><span class=o>&lt;</span><span 
class=n>String</span><span class=o>,</span> <span class=n>ValueT</span><span 
class=o>&gt;&gt;</span> <span class=n>perUser</span> <span class=o>=</span> 
<span class=n>readPerUser</span><span class=o>();</span>
 <span class=n>perUser</span><span class=o>.</span><span 
class=na>apply</span><span class=o>(</span><span class=n>Window</span><span 
class=o>.</span><span class=na>into</span><span class=o>(</span><span 
class=n>CalendarWindows</span><span class=o>.</span><span 
class=na>days</span><span class=o>(</span><span class=n>1</span><span 
class=o>)</span>
@@ -2477,7 +2533,17 @@ garbage-collection strategy.</p><p>For example, given 
the following:</p><div cla
               <span class=c1>// The state is scoped to a calendar day window. 
That means that if the input timestamp ts is after
 </span><span class=c1></span>              <span class=c1>// midnight PST, 
then a new copy of the state will be seen for the next day.
 </span><span class=c1></span>           <span class=o>}</span>
-         <span class=o>}));</span></code></pre></div></div><p>This 
<code>ParDo</code> stores state per day. Once the pipeline is done processing 
data for a given day, all the state for that
+         <span class=o>}));</span></code></pre></div></div><div 
class=language-python><div class=highlight><pre class=chroma><code 
class=language-python data-lang=python><span class=k>class</span> <span 
class=nc>StateDoFn</span><span class=p>(</span><span class=n>DoFn</span><span 
class=p>):</span>
+  <span class=n>ALL_ELEMENTS</span> <span class=o>=</span> <span 
class=n>BagStateSpec</span><span class=p>(</span><span 
class=s1>&#39;buffer&#39;</span><span class=p>,</span> <span 
class=n>coders</span><span class=o>.</span><span 
class=n>VarIntCoder</span><span class=p>())</span>
+  
+  <span class=k>def</span> <span class=nf>process</span><span 
class=p>(</span><span class=bp>self</span><span class=p>,</span> 
+              <span class=n>element_pair</span><span class=p>,</span> 
+              <span class=nb>buffer</span> <span class=o>=</span> <span 
class=n>DoFn</span><span class=o>.</span><span class=n>StateParam</span><span 
class=p>(</span><span class=n>ALL_ELEMENTS</span><span class=p>)):</span>
+    <span class=o>...</span>
+    
+<span class=n>_</span> <span class=o>=</span> <span class=p>(</span><span 
class=n>p</span> <span class=o>|</span> <span class=s1>&#39;Read per 
user&#39;</span> <span class=o>&gt;&gt;</span> <span 
class=n>ReadPerUser</span><span class=p>()</span>
+       <span class=o>|</span> <span class=s1>&#39;Windowing&#39;</span> <span 
class=o>&gt;&gt;</span> <span class=n>beam</span><span class=o>.</span><span 
class=n>WindowInto</span><span class=p>(</span><span 
class=n>FixedWindows</span><span class=p>(</span><span class=mi>60</span> <span 
class=o>*</span> <span class=mi>60</span> <span class=o>*</span> <span 
class=mi>24</span><span class=p>))</span>
+       <span class=o>|</span> <span class=s1>&#39;DoFn&#39;</span> <span 
class=o>&gt;&gt;</span> <span class=n>beam</span><span class=o>.</span><span 
class=n>ParDo</span><span class=p>(</span><span class=n>StateDoFn</span><span 
class=p>()))</span></code></pre></div></div><p>This <code>ParDo</code> stores 
state per day. Once the pipeline is done processing data for a given day, all 
the state for that
 day is garbage collected.</p><h5 id=using-timers-for-garbage-collection>11.4.1 
<strong>Using timers For garbage collection</strong></h5><p>In some cases, it 
is difficult to find a windowing strategy that models the desired 
garbage-collection strategy. For
 example, a common desire is to garbage collect state for a key once no 
activity has been seen on the key for some time.
 This can be done by updating a timer that garbage collects state. For 
example</p><div class=language-java><div class=highlight><pre 
class=chroma><code class=language-java data-lang=java><span 
class=n>PCollection</span><span class=o>&lt;</span><span class=n>KV</span><span 
class=o>&lt;</span><span class=n>String</span><span class=o>,</span> <span 
class=n>ValueT</span><span class=o>&gt;&gt;</span> <span class=n>perUser</span> 
<span class=o>=</span> <span class=n>readPerUser</span><span clas [...]
@@ -2514,7 +2580,36 @@ This can be done by updating a timer that garbage 
collects state. For example</p
 </span><span class=c1></span>       <span class=n>state</span><span 
class=o>.</span><span class=na>clear</span><span class=o>();</span>
        <span class=n>maxTimestamp</span><span class=o>.</span><span 
class=na>clear</span><span class=o>();</span>
     <span class=o>}</span>
- <span class=o>}</span></code></pre></div></div><h3 
id=state-timers-examples>11.5 State and timers examples</h3><p>Following are 
some example uses of state and timers</p><h4 
id=joining-clicks-and-views>11.5.1. Joining clicks and views</h4><p>In this 
example, the pipeline is processing data from an e-commerce site&rsquo;s home 
page. There are two input streams:
+ <span class=o>}</span></code></pre></div></div><div 
class=language-python><div class=highlight><pre class=chroma><code 
class=language-python data-lang=python><span class=k>class</span> <span 
class=nc>UserDoFn</span><span class=p>(</span><span class=n>DoFn</span><span 
class=p>):</span>
+  <span class=n>ALL_ELEMENTS</span> <span class=o>=</span> <span 
class=n>BagStateSpec</span><span class=p>(</span><span 
class=s1>&#39;state&#39;</span><span class=p>,</span> <span 
class=n>coders</span><span class=o>.</span><span 
class=n>VarIntCoder</span><span class=p>())</span>
+  <span class=n>MAX_TIMESTAMP</span> <span class=o>=</span> <span 
class=n>CombiningValueStateSpec</span><span class=p>(</span><span 
class=s1>&#39;max_timestamp_seen&#39;</span><span class=p>,</span> <span 
class=nb>max</span><span class=p>)</span>
+  <span class=n>TIMER</span> <span class=o>=</span> <span 
class=n>TimerSpec</span><span class=p>(</span><span 
class=s1>&#39;gc-timer&#39;</span><span class=p>,</span> <span 
class=n>TimeDomain</span><span class=o>.</span><span 
class=n>WATERMARK</span><span class=p>)</span>
+  
+  <span class=k>def</span> <span class=nf>process</span><span 
class=p>(</span><span class=bp>self</span><span class=p>,</span> 
+              <span class=n>element</span><span class=p>,</span> 
+              <span class=n>t</span> <span class=o>=</span> <span 
class=n>DoFn</span><span class=o>.</span><span 
class=n>TimestampParam</span><span class=p>,</span>
+              <span class=n>state</span> <span class=o>=</span> <span 
class=n>DoFn</span><span class=o>.</span><span class=n>StateParam</span><span 
class=p>(</span><span class=n>ALL_ELEMENTS</span><span class=p>),</span> 
+              <span class=n>max_timestamp</span> <span class=o>=</span> <span 
class=n>DoFn</span><span class=o>.</span><span class=n>StateParam</span><span 
class=p>(</span><span class=n>MAX_TIMESTAMP</span><span class=p>),</span>
+              <span class=n>timer</span> <span class=o>=</span> <span 
class=n>DoFn</span><span class=o>.</span><span class=n>TimerParam</span><span 
class=p>(</span><span class=n>TIMER</span><span class=p>)):</span>
+    <span class=n>update_state</span><span class=p>(</span><span 
class=n>state</span><span class=p>,</span> <span class=n>element</span><span 
class=p>)</span>
+    <span class=n>max_timestamp</span><span class=o>.</span><span 
class=n>add</span><span class=p>(</span><span class=n>t</span><span 
class=o>.</span><span class=n>micros</span><span class=p>)</span>
+    
+    <span class=c1># Set the timer to be one hour after the maximum timestamp 
seen. This will keep overwriting the same timer, so </span>
+    <span class=c1># as long as there is activity on this key the state will 
stay active. Once the key goes inactive for one hour&#39;s</span>
+    <span class=c1># worth of event time (as measured by the watermark), then 
the gc timer will fire.</span>
+    <span class=n>expiration_time</span> <span class=o>=</span> <span 
class=n>Timestamp</span><span class=p>(</span><span class=n>micros</span><span 
class=o>=</span><span class=n>max_timestamp</span><span class=o>.</span><span 
class=n>read</span><span class=p>())</span> <span class=o>+</span> <span 
class=n>Duration</span><span class=p>(</span><span class=n>seconds</span><span 
class=o>=</span><span class=mi>60</span><span class=o>*</span><span 
class=mi>60</span><span class=p>)</span>
+    <span class=n>timer</span><span class=o>.</span><span 
class=n>set</span><span class=p>(</span><span 
class=n>expiration_time</span><span class=p>)</span>
+  
+  <span class=nd>@on_timer</span><span class=p>(</span><span 
class=n>TIMER</span><span class=p>)</span>
+  <span class=k>def</span> <span class=nf>expiry_callback</span><span 
class=p>(</span><span class=bp>self</span><span class=p>,</span> 
+                      <span class=n>state</span> <span class=o>=</span> <span 
class=n>DoFn</span><span class=o>.</span><span class=n>StateParam</span><span 
class=p>(</span><span class=n>ALL_ELEMENTS</span><span class=p>),</span>
+                      <span class=n>max_timestamp</span> <span 
class=o>=</span> <span class=n>DoFn</span><span class=o>.</span><span 
class=n>StateParam</span><span class=p>(</span><span 
class=n>MAX_TIMESTAMP</span><span class=p>)):</span>
+    <span class=n>state</span><span class=o>.</span><span 
class=n>clear</span><span class=p>()</span>
+    <span class=n>max_timestamp</span><span class=o>.</span><span 
class=n>clear</span><span class=p>()</span>
+  
+    
+<span class=n>_</span> <span class=o>=</span> <span class=p>(</span><span 
class=n>p</span> <span class=o>|</span> <span class=s1>&#39;Read per 
user&#39;</span> <span class=o>&gt;&gt;</span> <span 
class=n>ReadPerUser</span><span class=p>()</span>
+       <span class=o>|</span> <span class=s1>&#39;User DoFn&#39;</span> <span 
class=o>&gt;&gt;</span> <span class=n>beam</span><span class=o>.</span><span 
class=n>ParDo</span><span class=p>(</span><span class=n>UserDoFn</span><span 
class=p>()))</span></code></pre></div></div><h3 id=state-timers-examples>11.5 
State and timers examples</h3><p>Following are some example uses of state and 
timers</p><h4 id=joining-clicks-and-views>11.5.1. Joining clicks and 
views</h4><p>In this example, the p [...]
 a stream of views, representing suggested product links displayed to the user 
on the home page, and a stream of
 clicks, representing actual user clicks on these links. The goal of the 
pipeline is to join click events with view
 events, outputting a new joined event that contains information from both 
events. Each link has a unique identifier
diff --git a/website/generated-content/get-started/index.xml 
b/website/generated-content/get-started/index.xml
index b2b78eb..18c0423 100644
--- a/website/generated-content/get-started/index.xml
+++ b/website/generated-content/get-started/index.xml
@@ -1399,7 +1399,7 @@ environment&amp;rsquo;s directories.&lt;/p>
 &lt;pre>&lt;code>PS&amp;gt; python -m pip install 
apache-beam&lt;/code>&lt;/pre>
 &lt;/div>
 &lt;h4 id="extra-requirements">Extra requirements&lt;/h4>
-&lt;p>The above installation will not install all the extra dependencies for 
using features like the Google Cloud Dataflow runner. Information on what extra 
packages are required for different features are highlighted below. It is 
possible to install multitple extra requirements using something like 
&lt;code>pip install apache-beam[feature1,feature2]&lt;/code>.&lt;/p>
+&lt;p>The above installation will not install all the extra dependencies for 
using features like the Google Cloud Dataflow runner. Information on what extra 
packages are required for different features are highlighted below. It is 
possible to install multiple extra requirements using something like 
&lt;code>pip install apache-beam[feature1,feature2]&lt;/code>.&lt;/p>
 &lt;ul>
 &lt;li>&lt;strong>Google Cloud Platform&lt;/strong>
 &lt;ul>
@@ -1414,6 +1414,12 @@ environment&amp;rsquo;s directories.&lt;/p>
 &lt;/li>
 &lt;/ul>
 &lt;/li>
+&lt;li>&lt;strong>Amazon Web Services&lt;/strong>
+&lt;ul>
+&lt;li>Installation Command: &lt;code>pip install 
apache-beam[aws]&lt;/code>&lt;/li>
+&lt;li>Required for I/O connectors interfacing with AWS&lt;/li>
+&lt;/ul>
+&lt;/li>
 &lt;li>&lt;strong>Tests&lt;/strong>
 &lt;ul>
 &lt;li>Installation Command: &lt;code>pip install 
apache-beam[test]&lt;/code>&lt;/li>
diff --git a/website/generated-content/get-started/quickstart-py/index.html 
b/website/generated-content/get-started/quickstart-py/index.html
index 855b217..f26cc7f 100644
--- a/website/generated-content/get-started/quickstart-py/index.html
+++ b/website/generated-content/get-started/quickstart-py/index.html
@@ -9,7 +9,7 @@ administrative privileges.</p><div 
class=shell-unix><pre><code>pip install --upg
 <code>setuptools</code> is installed on your machine. If you do not have 
<code>setuptools</code>
 version 17.1 or newer, run the following command to install it.</p><div 
class=shell-unix><pre><code>pip install --upgrade 
setuptools</code></pre></div><div class=shell-PowerShell><pre><code>PS&gt; 
python -m pip install --upgrade setuptools</code></pre></div><h2 
id=get-apache-beam>Get Apache Beam</h2><h3 
id=create-and-activate-a-virtual-environment>Create and activate a virtual 
environment</h3><p>A virtual environment is a directory tree containing its own 
Python distribution. To create a [...]
 Activating it sets some environment variables that point to the virtual
-environment&rsquo;s directories.</p><p>To activate a virtual environment in 
Bash, run:</p><div class=shell-unix><pre><code>. 
/path/to/directory/bin/activate</code></pre></div><div 
class=shell-PowerShell><pre><code>PS&gt; 
C:\path\to\directory\Scripts\activate.ps1</code></pre></div><p>That is, execute 
the <code>activate</code> script under the virtual environment directory you 
created.</p><p>For instructions using other shells, see the <a 
href=https://virtualenv.pypa.io/en/stable/userguide [...]
+environment&rsquo;s directories.</p><p>To activate a virtual environment in 
Bash, run:</p><div class=shell-unix><pre><code>. 
/path/to/directory/bin/activate</code></pre></div><div 
class=shell-PowerShell><pre><code>PS&gt; 
C:\path\to\directory\Scripts\activate.ps1</code></pre></div><p>That is, execute 
the <code>activate</code> script under the virtual environment directory you 
created.</p><p>For instructions using other shells, see the <a 
href=https://virtualenv.pypa.io/en/stable/userguide [...]
 See https://beam.apache.org/roadmap/portability/#python-on-flink for more 
information.</code></pre></div><div 
class=runner-flink-cluster><pre><code>Currently, running wordcount.py on Flink 
requires a full download of the Beam source code.
 See https://beam.apache.org/documentation/runners/flink/ for more 
information.</code></pre></div><div class=runner-spark><pre><code>Currently, 
running wordcount.py on Spark requires a full download of the Beam source code.
 See https://beam.apache.org/roadmap/portability/#python-on-spark for more 
information.</code></pre></div><div class=runner-dataflow><pre><code># As part 
of the initial setup, install Google Cloud Platform specific extra components. 
Make sure you
diff --git a/website/generated-content/sitemap.xml 
b/website/generated-content/sitemap.xml
index c71c97b..aafbf27 100644
--- a/website/generated-content/sitemap.xml
+++ b/website/generated-content/sitemap.xml
@@ -1 +1 @@
-<?xml version="1.0" encoding="utf-8" standalone="yes"?><urlset 
xmlns="http://www.sitemaps.org/schemas/sitemap/0.9"; 
xmlns:xhtml="http://www.w3.org/1999/xhtml";><url><loc>/categories/blog/</loc><lastmod>2020-06-01T15:15:07-04:00</lastmod></url><url><loc>/blog/</loc><lastmod>2020-06-01T15:15:07-04:00</lastmod></url><url><loc>/categories/</loc><lastmod>2020-06-01T15:15:07-04:00</lastmod></url><url><loc>/categories/python/</loc><lastmod>2020-05-28T15:14:36-07:00</lastmod></url><url><loc>/blog/
 [...]
\ No newline at end of file
+<?xml version="1.0" encoding="utf-8" standalone="yes"?><urlset 
xmlns="http://www.sitemaps.org/schemas/sitemap/0.9"; 
xmlns:xhtml="http://www.w3.org/1999/xhtml";><url><loc>/categories/blog/</loc><lastmod>2020-06-01T15:15:07-04:00</lastmod></url><url><loc>/blog/</loc><lastmod>2020-06-01T15:15:07-04:00</lastmod></url><url><loc>/categories/</loc><lastmod>2020-06-01T15:15:07-04:00</lastmod></url><url><loc>/categories/python/</loc><lastmod>2020-05-28T15:14:36-07:00</lastmod></url><url><loc>/blog/
 [...]
\ No newline at end of file

Reply via email to