This is an automated email from the ASF dual-hosted git repository.
git-site-role pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/asf-site by this push:
new 2714963 Publishing website 2020/06/06 00:06:29 at commit 42faa44
2714963 is described below
commit 2714963849242ea3871be308d119b8a599405c58
Author: jenkins <[email protected]>
AuthorDate: Sat Jun 6 00:06:30 2020 +0000
Publishing website 2020/06/06 00:06:29 at commit 42faa44
---
website/generated-content/documentation/index.xml | 108 ++++++++++++++++++-
.../documentation/programming-guide/index.html | 119 ++++++++++++++++++---
website/generated-content/get-started/index.xml | 8 +-
.../get-started/quickstart-py/index.html | 2 +-
website/generated-content/sitemap.xml | 2 +-
5 files changed, 220 insertions(+), 19 deletions(-)
diff --git a/website/generated-content/documentation/index.xml
b/website/generated-content/documentation/index.xml
index 873fd6b..02ed7a7 100644
--- a/website/generated-content/documentation/index.xml
+++ b/website/generated-content/documentation/index.xml
@@ -5271,16 +5271,19 @@ the <code>ParDo</code> these state variables can
be used to write or updat
written for that key. State is always fully scoped only to the current
processing key.</p>
<p>Windowing can still be used together with stateful processing. All state
for a key is scoped to the current window. This
means that the first time a key is seen for a given window any state reads
will return empty, and that a runner can
-garbage collect state when a window is completed. It&rsquo;s also often
useful to use Beam&rsquo;s windowed aggegations prior to
+garbage collect state when a window is completed. It&rsquo;s also often
useful to use Beam&rsquo;s windowed aggregations prior to
the stateful operator. For example, using a combiner to preaggregate data, and
then storing aggregated data inside of
state. Merging windows are not currently supported when using state and
timers.</p>
<p>Sometimes stateful processing is used to implement state-machine style
processing inside a <code>DoFn</code>. When doing this,
care must be taken to remember that the elements in input PCollection have no
guaranteed order and to ensure that the
program logic is resilient to this. Unit tests written using the DirectRunner
will shuffle the order of element
processing, and are recommended to test for correctness.</p>
-<p>In Java DoFn declares states to be accessed by creating final
<code>StateSpec</code> member variables representing each state. Each
+<p class="language-java">In Java DoFn declares states to be accessed by
creating final <code>StateSpec</code> member variables representing each
state. Each
state must be named using the <code>StateId</code> annotation; this name
is unique to a ParDo in the graph and has no relation
to other nodes in the graph. A <code>DoFn</code> can declare multiple
state variables.</p>
+<p class="language-py">In Python DoFn declares states to be accessed by
creating <code>StateSpec</code> class member variables representing each
state. Each
+<code>StateSpec</code> is initialized with a name, this name is unique
to a ParDo in the graph and has no relation
+to other nodes in the graph. A <code>DoFn</code> can declare multiple
state variables.</p>
<h3 id="types-of-state">11.1 Types of state</h3>
<p>Beam provides several types of state:</p>
<h4 id="valuestate">ValueState</h4>
@@ -5323,6 +5326,14 @@ accumulates the number of elements seen.</p>
<span class="o">}</span>
<span class="o">}));</span></code></pre></div>
</div>
+<div class=language-python>
+<div class="highlight"><pre class="chroma"><code
class="language-python" data-lang="python"><span class="k">class</span>
<span class="nc">CombiningStateDoFn</span><span
class="p">(</span><span class="n">DoFn</span><span
class="p">):</span>
+<span class="n">SUM_TOTAL</span> <span class="o">=</span> <span
class="n">CombiningValueStateSpec</span><span
class="p">(</span><span
class="s1">&#39;total&#39;</span><span class="p">,</span>
<span class="nb">sum</span><span class="p">)</span>
+<span class="k">def</span> <span class="nf">process</span><span
class="p">(</span><span class="bp">self</span><span
class="p">,</span> <span class="n">element</span><span
class="p">,</span> <span class="n">state</span><span
class="o">=</span><span class="n">SoFn</span><span
class="o">.</span><span class="n">StateParam</span><span
class="p">(</span><span class="n">SUM_TOTAL</span><span
class="p">)):</span>
+<span class="n">state</span><span class="o">.</span><span
class="n">add</span><span class="p">(</span><span
class="mi">1</span><span class="p">)</span>
+<span class="n">_</span> <span class="o">=</span> <span
class="p">(</span><span class="n">p</span> <span
class="o">|</span> <span class="s1">&#39;Read per
user&#39;</span> <span class="o">&gt;&gt;</span> <span
class="n">ReadPerUser</span><span class="p">()</span>
+<span class="o">|</span> <span class="s1">&#39;Combine state
pardo&#39;</span> <span class="o">&gt;&gt;</span> <span
class="n">beam</span><span class="o">.</span><span
class="n">ParDo</span><span class="p">(</span><span
class="n">CombiningStateDofn</span><span
class="p">()))</span></code></pre></div>
+</div>
<h4 id="bagstate">BagState</h4>
<p>A common use case for state is to accumulate multiple elements.
<code>BagState</code> allows for accumulating an unordered set
ofelements. This allows for addition of elements to the collection without
requiring the reading of the entire
@@ -5346,6 +5357,18 @@ bags larger than available memory.</p>
<span class="o">}</span>
<span class="o">}));</span></code></pre></div>
</div>
+<div class=language-python>
+<div class="highlight"><pre class="chroma"><code
class="language-python" data-lang="python"><span class="k">class</span>
<span class="nc">BagStateDoFn</span><span
class="p">(</span><span class="n">DoFn</span><span
class="p">):</span>
+<span class="n">ALL_ELEMENTS</span> <span class="o">=</span>
<span class="n">BagStateSpec</span><span class="p">(</span><span
class="s1">&#39;buffer&#39;</span><span class="p">,</span>
<span class="n">coders</span><span class="o">.</span><span
class="n">VarIntCoder</span><span class="p">())</span>
+<span class="k">def</span> <span class="nf">process</span><span
class="p">(</span><span class="bp">self</span><span
class="p">,</span> <span class="n">element_pair</span><span
class="p">,</span> <span class="n">state</span><span
class="o">=</span><span class="n">DoFn</span><span
class="o">.</span><span class="n">StateParam</span><span
class="p">(</span><span class="n">ALL_ELEMENTS</span><span
class="p" [...]
+<span class="n">state</span><span class="o">.</span><span
class="n">add</span><span class="p">(</span><span
class="n">element_pair</span><span class="p">[</span><span
class="mi">1</span><span class="p">])</span>
+<span class="k">if</span> <span
class="n">should_fetch</span><span class="p">():</span>
+<span class="n">all_elements</span> <span class="o">=</span>
<span class="nb">list</span><span class="p">(</span><span
class="n">state</span><span class="o">.</span><span
class="n">read</span><span class="p">())</span>
+<span class="n">process_values</span><span
class="p">(</span><span class="n">all_elements</span><span
class="p">)</span>
+<span class="n">state</span><span class="o">.</span><span
class="n">clear</span><span class="p">()</span>
+<span class="n">_</span> <span class="o">=</span> <span
class="p">(</span><span class="n">p</span> <span
class="o">|</span> <span class="s1">&#39;Read per
user&#39;</span> <span class="o">&gt;&gt;</span> <span
class="n">ReadPerUser</span><span class="p">()</span>
+<span class="o">|</span> <span class="s1">&#39;Bag state
pardo&#39;</span> <span class="o">&gt;&gt;</span> <span
class="n">beam</span><span class="o">.</span><span
class="n">ParDo</span><span class="p">(</span><span
class="n">BagStateDoFn</span><span
class="p">()))</span></code></pre></div>
+</div>
<h3 id="deferred-state-reads">11.2 Deferred state reads</h3>
<p>When a <code>DoFn</code> contains multiple state specifications,
reading each one in order can be slow. Calling the <code>read()</code>
function
on a state can cause the runner to perform a blocking read. Performing
multiple blocking reads in sequence adds latency
@@ -5420,12 +5443,30 @@ allows for event-time aggregations.</p>
</span><span class="c1"></span> <span class="o">}</span>
<span class="o">}));</span></code></pre></div>
</div>
+<div class=language-python>
+<div class="highlight"><pre class="chroma"><code
class="language-python" data-lang="python"><span class="k">class</span>
<span class="nc">EventTimerDoFn</span><span
class="p">(</span><span class="n">DoFn</span><span
class="p">):</span>
+<span class="n">ALL_ELEMENTS</span> <span class="o">=</span>
<span class="n">BagStateSpec</span><span class="p">(</span><span
class="s1">&#39;buffer&#39;</span><span class="p">,</span>
<span class="n">coders</span><span class="o">.</span><span
class="n">VarIntCoder</span><span class="p">())</span>
+<span class="n">TIMER</span> <span class="o">=</span> <span
class="n">TimerSpec</span><span class="p">(</span><span
class="s1">&#39;timer&#39;</span><span class="p">,</span>
<span class="n">TimeDomain</span><span class="o">.</span><span
class="n">WATERMARK</span><span class="p">)</span>
+<span class="k">def</span> <span class="nf">process</span><span
class="p">(</span><span class="bp">self</span><span
class="p">,</span>
+<span class="n">element_pair</span><span class="p">,</span>
+<span class="n">t</span> <span class="o">=</span> <span
class="n">DoFn</span><span class="o">.</span><span
class="n">TimestampParam</span><span class="p">,</span>
+<span class="nb">buffer</span> <span class="o">=</span> <span
class="n">DoFn</span><span class="o">.</span><span
class="n">StateParam</span><span class="p">(</span><span
class="n">ALL_ELEMENTS</span><span class="p">),</span>
+<span class="n">timer</span> <span class="o">=</span> <span
class="n">DoFn</span><span class="o">.</span><span
class="n">TimerParam</span><span class="p">(</span><span
class="n">TIMER</span><span class="p">)):</span>
+<span class="nb">buffer</span><span class="o">.</span><span
class="n">add</span><span class="p">(</span><span
class="n">element_pair</span><span class="p">[</span><span
class="mi">1</span><span class="p">])</span>
+<span class="c1"># Set an event-time timer to the element
timestamp.</span>
+<span class="n">timer</span><span class="o">.</span><span
class="n">set</span><span class="p">(</span><span
class="n">t</span><span class="p">)</span>
+<span class="nd">@on_timer</span><span class="p">(</span><span
class="n">TIMER</span><span class="p">)</span>
+<span class="k">def</span> <span
class="nf">expiry_callback</span><span class="p">(</span><span
class="bp">self</span><span class="p">,</span> <span
class="nb">buffer</span> <span class="o">=</span> <span
class="n">DoFn</span><span class="o">.</span><span
class="n">StateParam</span><span class="p">(</span><span
class="n">ALL_ELEMENTS</span><span class="p">)):</span>
+<span class="n">state</span><span class="o">.</span><span
class="n">clear</span><span class="p">()</span>
+<span class="n">_</span> <span class="o">=</span> <span
class="p">(</span><span class="n">p</span> <span
class="o">|</span> <span class="s1">&#39;Read per
user&#39;</span> <span class="o">&gt;&gt;</span> <span
class="n">ReadPerUser</span><span class="p">()</span>
+<span class="o">|</span> <span class="s1">&#39;EventTime timer
pardo&#39;</span> <span class="o">&gt;&gt;</span> <span
class="n">beam</span><span class="o">.</span><span
class="n">ParDo</span><span class="p">(</span><span
class="n">EventTimerDoFn</span><span
class="p">()))</span></code></pre></div>
+</div>
<h4 id="processing-time-timers">11.3.2 Processing-time timers</h4>
<p>Processing-time timers fire when the real wall-clock time passes. This
is often used to create larger batches of data
before processing. It can also be used to schedule events that should occur at
a specific time. Just like with
event-time timers, processing-time timers are per key - each key has a
separate copy of the timer.</p>
<p>While processing-time timers can be set to an absolute timestamp, it is
very common to set them to an offset relative
-to the current time. The <code>Timer.offset</code> and
<code>Timer.setRelative</code> methods can be used to accomplish
this.</p>
+to the current time. In Java, the <code>Timer.offset</code> and
<code>Timer.setRelative</code> methods can be used to accomplish
this.</p>
<div class=language-java>
<div class="highlight"><pre class="chroma"><code
class="language-java" data-lang="java"><span
class="n">PCollection</span><span class="o">&lt;</span><span
class="n">KV</span><span class="o">&lt;</span><span
class="n">String</span><span class="o">,</span> <span
class="n">ValueT</span><span class="o">&gt;&gt;</span>
<span class="n">perUser</span> <span class="o">=</span> <span
class="n">readPerUser</ [...]
<span class="n">perUser</span><span class="o">.</span><span
class="na">apply</span><span class="o">(</span><span
class="n">ParDo</span><span class="o">.</span><span
class="na">of</span><span class="o">(</span><span
class="k">new</span> <span class="n">DoFn</span><span
class="o">&lt;</span><span class="n">KV</span><span
class="o">&lt;</span><span class="n">String</span><span
class="o">,</span [...]
@@ -5440,6 +5481,24 @@ to the current time. The <code>Timer.offset</code>
and <code>Timer.setR
</span><span class="c1"></span> <span class="o">}</span>
<span class="o">}));</span></code></pre></div>
</div>
+<div class=language-python>
+<div class="highlight"><pre class="chroma"><code
class="language-python" data-lang="python"><span class="k">class</span>
<span class="nc">ProcessingTimerDoFn</span><span
class="p">(</span><span class="n">DoFn</span><span
class="p">):</span>
+<span class="n">ALL_ELEMENTS</span> <span class="o">=</span>
<span class="n">BagStateSpec</span><span class="p">(</span><span
class="s1">&#39;buffer&#39;</span><span class="p">,</span>
<span class="n">coders</span><span class="o">.</span><span
class="n">VarIntCoder</span><span class="p">())</span>
+<span class="n">TIMER</span> <span class="o">=</span> <span
class="n">TimerSpec</span><span class="p">(</span><span
class="s1">&#39;timer&#39;</span><span class="p">,</span>
<span class="n">TimeDomain</span><span class="o">.</span><span
class="n">REAL_TIME</span><span class="p">)</span>
+<span class="k">def</span> <span class="nf">process</span><span
class="p">(</span><span class="bp">self</span><span
class="p">,</span>
+<span class="n">element_pair</span><span class="p">,</span>
+<span class="nb">buffer</span> <span class="o">=</span> <span
class="n">DoFn</span><span class="o">.</span><span
class="n">StateParam</span><span class="p">(</span><span
class="n">ALL_ELEMENTS</span><span class="p">),</span>
+<span class="n">timer</span> <span class="o">=</span> <span
class="n">DoFn</span><span class="o">.</span><span
class="n">TimerParam</span><span class="p">(</span><span
class="n">TIMER</span><span class="p">)):</span>
+<span class="nb">buffer</span><span class="o">.</span><span
class="n">add</span><span class="p">(</span><span
class="n">element_pair</span><span class="p">[</span><span
class="mi">1</span><span class="p">])</span>
+<span class="c1"># Set a timer to go off 30 seconds in the future.</span>
+<span class="n">timer</span><span class="o">.</span><span
class="n">set</span><span class="p">(</span><span
class="n">Timestamp</span><span class="o">.</span><span
class="n">now</span><span class="p">()</span> <span
class="o">+</span> <span class="n">Duration</span><span
class="p">(</span><span class="n">seconds</span><span
class="o">=</span><span class="mi">30</span><span
class="p">))</span>
+<span class="nd">@on_timer</span><span class="p">(</span><span
class="n">TIMER</span><span class="p">)</span>
+<span class="k">def</span> <span
class="nf">expiry_callback</span><span class="p">(</span><span
class="bp">self</span><span class="p">,</span> <span
class="nb">buffer</span> <span class="o">=</span> <span
class="n">DoFn</span><span class="o">.</span><span
class="n">StateParam</span><span class="p">(</span><span
class="n">ALL_ELEMENTS</span><span class="p">)):</span>
+<span class="c1"># Process timer.</span>
+<span class="n">state</span><span class="o">.</span><span
class="n">clear</span><span class="p">()</span>
+<span class="n">_</span> <span class="o">=</span> <span
class="p">(</span><span class="n">p</span> <span
class="o">|</span> <span class="s1">&#39;Read per
user&#39;</span> <span class="o">&gt;&gt;</span> <span
class="n">ReadPerUser</span><span class="p">()</span>
+<span class="o">|</span> <span class="s1">&#39;ProcessingTime
timer pardo&#39;</span> <span class="o">&gt;&gt;</span>
<span class="n">beam</span><span class="o">.</span><span
class="n">ParDo</span><span class="p">(</span><span
class="n">ProcessingTimerDoFn</span><span
class="p">()))</span></code></pre></div>
+</div>
<h4 id="dynamic-timer-tags">11.3.3 Dynamic timer tags</h4>
<p>Beam also supports dynamically setting a timer tag using
<code>TimerMap</code>. This allows for setting multiple different timers
in a <code>DoFn</code> and allowing for the timer tags to be dynamically
chosen - e.g. based on data in the input elements. A
@@ -5462,6 +5521,9 @@ id, and timers in different timer families are
independent.</p>
<span class="o">}</span>
<span class="o">}));</span></code></pre></div>
</div>
+<div class=language-python>
+<div class="highlight"><pre class="chroma"><code
class="language-python" data-lang="python"><span class="n">To</span>
<span class="n">be</span> <span class="n">supported</span><span
class="p">,</span> <span class="n">See</span> <span
class="n">BEAM</span><span class="o">-</span><span
class="mi">9602</span></code></pre></div>
+</div>
<h4 id="timer-output-timestamps">11.3.4 Timer output timestamps</h4>
<p>By default, event-time timers will hold the output watermark of the
<code>ParDo</code> to the timestamp of the timer. This means
that if a timer is set to 12pm, any windowed aggregations or event-time timers
later in the pipeline graph that finish<br>
@@ -5560,7 +5622,7 @@ performance. There are two common strategies for garbage
collecting state.</p
<p>All state and timers for a key is scoped to the window it is in. This
means that depending on the timestamp of the
input element the ParDo will see different values for the state depending on
the window that element falls into. In
addition, once the input watermark passes the end of the window, the runner
should garbage collect all state for that
-window. (note: if allowed lateness is set to a positive value for the window,
the runner must wait for the watemark to
+window. (note: if allowed lateness is set to a positive value for the window,
the runner must wait for the watermark to
pass the end of the window plus the allowed lateness before garbage collecting
state). This can be used as a
garbage-collection strategy.</p>
<p>For example, given the following:</p>
@@ -5577,6 +5639,17 @@ garbage-collection strategy.</p>
</span><span class="c1"></span> <span class="o">}</span>
<span class="o">}));</span></code></pre></div>
</div>
+<div class=language-python>
+<div class="highlight"><pre class="chroma"><code
class="language-python" data-lang="python"><span class="k">class</span>
<span class="nc">StateDoFn</span><span class="p">(</span><span
class="n">DoFn</span><span class="p">):</span>
+<span class="n">ALL_ELEMENTS</span> <span class="o">=</span>
<span class="n">BagStateSpec</span><span class="p">(</span><span
class="s1">&#39;buffer&#39;</span><span class="p">,</span>
<span class="n">coders</span><span class="o">.</span><span
class="n">VarIntCoder</span><span class="p">())</span>
+<span class="k">def</span> <span class="nf">process</span><span
class="p">(</span><span class="bp">self</span><span
class="p">,</span>
+<span class="n">element_pair</span><span class="p">,</span>
+<span class="nb">buffer</span> <span class="o">=</span> <span
class="n">DoFn</span><span class="o">.</span><span
class="n">StateParam</span><span class="p">(</span><span
class="n">ALL_ELEMENTS</span><span class="p">)):</span>
+<span class="o">...</span>
+<span class="n">_</span> <span class="o">=</span> <span
class="p">(</span><span class="n">p</span> <span
class="o">|</span> <span class="s1">&#39;Read per
user&#39;</span> <span class="o">&gt;&gt;</span> <span
class="n">ReadPerUser</span><span class="p">()</span>
+<span class="o">|</span> <span
class="s1">&#39;Windowing&#39;</span> <span
class="o">&gt;&gt;</span> <span class="n">beam</span><span
class="o">.</span><span class="n">WindowInto</span><span
class="p">(</span><span class="n">FixedWindows</span><span
class="p">(</span><span class="mi">60</span> <span
class="o">*</span> <span class="mi">60</span> <span
class="o">*</span> <span class="mi">24< [...]
+<span class="o">|</span> <span
class="s1">&#39;DoFn&#39;</span> <span
class="o">&gt;&gt;</span> <span class="n">beam</span><span
class="o">.</span><span class="n">ParDo</span><span
class="p">(</span><span class="n">StateDoFn</span><span
class="p">()))</span></code></pre></div>
+</div>
<p>This <code>ParDo</code> stores state per day. Once the pipeline is
done processing data for a given day, all the state for that
day is garbage collected.</p>
<h5 id="using-timers-for-garbage-collection">11.4.1 <strong>Using timers
For garbage collection</strong></h5>
@@ -5615,6 +5688,33 @@ This can be done by updating a timer that garbage
collects state. For example<
<span class="o">}</span>
<span class="o">}</span></code></pre></div>
</div>
+<div class=language-python>
+<div class="highlight"><pre class="chroma"><code
class="language-python" data-lang="python"><span class="k">class</span>
<span class="nc">UserDoFn</span><span class="p">(</span><span
class="n">DoFn</span><span class="p">):</span>
+<span class="n">ALL_ELEMENTS</span> <span class="o">=</span>
<span class="n">BagStateSpec</span><span class="p">(</span><span
class="s1">&#39;state&#39;</span><span class="p">,</span>
<span class="n">coders</span><span class="o">.</span><span
class="n">VarIntCoder</span><span class="p">())</span>
+<span class="n">MAX_TIMESTAMP</span> <span class="o">=</span>
<span class="n">CombiningValueStateSpec</span><span
class="p">(</span><span
class="s1">&#39;max_timestamp_seen&#39;</span><span
class="p">,</span> <span class="nb">max</span><span
class="p">)</span>
+<span class="n">TIMER</span> <span class="o">=</span> <span
class="n">TimerSpec</span><span class="p">(</span><span
class="s1">&#39;gc-timer&#39;</span><span class="p">,</span>
<span class="n">TimeDomain</span><span class="o">.</span><span
class="n">WATERMARK</span><span class="p">)</span>
+<span class="k">def</span> <span class="nf">process</span><span
class="p">(</span><span class="bp">self</span><span
class="p">,</span>
+<span class="n">element</span><span class="p">,</span>
+<span class="n">t</span> <span class="o">=</span> <span
class="n">DoFn</span><span class="o">.</span><span
class="n">TimestampParam</span><span class="p">,</span>
+<span class="n">state</span> <span class="o">=</span> <span
class="n">DoFn</span><span class="o">.</span><span
class="n">StateParam</span><span class="p">(</span><span
class="n">ALL_ELEMENTS</span><span class="p">),</span>
+<span class="n">max_timestamp</span> <span class="o">=</span>
<span class="n">DoFn</span><span class="o">.</span><span
class="n">StateParam</span><span class="p">(</span><span
class="n">MAX_TIMESTAMP</span><span class="p">),</span>
+<span class="n">timer</span> <span class="o">=</span> <span
class="n">DoFn</span><span class="o">.</span><span
class="n">TimerParam</span><span class="p">(</span><span
class="n">TIMER</span><span class="p">)):</span>
+<span class="n">update_state</span><span
class="p">(</span><span class="n">state</span><span
class="p">,</span> <span class="n">element</span><span
class="p">)</span>
+<span class="n">max_timestamp</span><span
class="o">.</span><span class="n">add</span><span
class="p">(</span><span class="n">t</span><span
class="o">.</span><span class="n">micros</span><span
class="p">)</span>
+<span class="c1"># Set the timer to be one hour after the maximum timestamp
seen. This will keep overwriting the same timer, so </span>
+<span class="c1"># as long as there is activity on this key the state will
stay active. Once the key goes inactive for one hour&#39;s</span>
+<span class="c1"># worth of event time (as measured by the watermark), then
the gc timer will fire.</span>
+<span class="n">expiration_time</span> <span class="o">=</span>
<span class="n">Timestamp</span><span class="p">(</span><span
class="n">micros</span><span class="o">=</span><span
class="n">max_timestamp</span><span class="o">.</span><span
class="n">read</span><span class="p">())</span> <span
class="o">+</span> <span class="n">Duration</span><span
class="p">(</span><span class="n">seconds</span><span cl [...]
+<span class="n">timer</span><span class="o">.</span><span
class="n">set</span><span class="p">(</span><span
class="n">expiration_time</span><span class="p">)</span>
+<span class="nd">@on_timer</span><span class="p">(</span><span
class="n">TIMER</span><span class="p">)</span>
+<span class="k">def</span> <span
class="nf">expiry_callback</span><span class="p">(</span><span
class="bp">self</span><span class="p">,</span>
+<span class="n">state</span> <span class="o">=</span> <span
class="n">DoFn</span><span class="o">.</span><span
class="n">StateParam</span><span class="p">(</span><span
class="n">ALL_ELEMENTS</span><span class="p">),</span>
+<span class="n">max_timestamp</span> <span class="o">=</span>
<span class="n">DoFn</span><span class="o">.</span><span
class="n">StateParam</span><span class="p">(</span><span
class="n">MAX_TIMESTAMP</span><span class="p">)):</span>
+<span class="n">state</span><span class="o">.</span><span
class="n">clear</span><span class="p">()</span>
+<span class="n">max_timestamp</span><span
class="o">.</span><span class="n">clear</span><span
class="p">()</span>
+<span class="n">_</span> <span class="o">=</span> <span
class="p">(</span><span class="n">p</span> <span
class="o">|</span> <span class="s1">&#39;Read per
user&#39;</span> <span class="o">&gt;&gt;</span> <span
class="n">ReadPerUser</span><span class="p">()</span>
+<span class="o">|</span> <span class="s1">&#39;User
DoFn&#39;</span> <span class="o">&gt;&gt;</span> <span
class="n">beam</span><span class="o">.</span><span
class="n">ParDo</span><span class="p">(</span><span
class="n">UserDoFn</span><span
class="p">()))</span></code></pre></div>
+</div>
<h3 id="state-timers-examples">11.5 State and timers examples</h3>
<p>Following are some example uses of state and timers</p>
<h4 id="joining-clicks-and-views">11.5.1. Joining clicks and views</h4>
diff --git
a/website/generated-content/documentation/programming-guide/index.html
b/website/generated-content/documentation/programming-guide/index.html
index 39612c3..d24661c 100644
--- a/website/generated-content/documentation/programming-guide/index.html
+++ b/website/generated-content/documentation/programming-guide/index.html
@@ -2223,13 +2223,15 @@ is modeled as a <code>PCollection<KV<K,
V>></code>. A <code>ParDo</code> p
the <code>ParDo</code> these state variables can be used to write or update
state for the current key or to read previous state
written for that key. State is always fully scoped only to the current
processing key.</p><p>Windowing can still be used together with stateful
processing. All state for a key is scoped to the current window. This
means that the first time a key is seen for a given window any state reads
will return empty, and that a runner can
-garbage collect state when a window is completed. It’s also often useful
to use Beam’s windowed aggegations prior to
+garbage collect state when a window is completed. It’s also often useful
to use Beam’s windowed aggregations prior to
the stateful operator. For example, using a combiner to preaggregate data, and
then storing aggregated data inside of
state. Merging windows are not currently supported when using state and
timers.</p><p>Sometimes stateful processing is used to implement state-machine
style processing inside a <code>DoFn</code>. When doing this,
care must be taken to remember that the elements in input PCollection have no
guaranteed order and to ensure that the
program logic is resilient to this. Unit tests written using the DirectRunner
will shuffle the order of element
-processing, and are recommended to test for correctness.</p><p>In Java DoFn
declares states to be accessed by creating final <code>StateSpec</code> member
variables representing each state. Each
+processing, and are recommended to test for correctness.</p><p
class=language-java>In Java DoFn declares states to be accessed by creating
final <code>StateSpec</code> member variables representing each state. Each
state must be named using the <code>StateId</code> annotation; this name is
unique to a ParDo in the graph and has no relation
+to other nodes in the graph. A <code>DoFn</code> can declare multiple state
variables.</p><p class=language-py>In Python DoFn declares states to be
accessed by creating <code>StateSpec</code> class member variables representing
each state. Each
+<code>StateSpec</code> is initialized with a name, this name is unique to a
ParDo in the graph and has no relation
to other nodes in the graph. A <code>DoFn</code> can declare multiple state
variables.</p><h3 id=types-of-state>11.1 Types of state</h3><p>Beam provides
several types of state:</p><h4 id=valuestate>ValueState</h4><p>A ValueState is
a scalar state value. For each key in the input, a ValueState will store a
typed value that can be
read and modified inside the DoFn’s <code>@ProcessElement</code> or
<code>@OnTimer</code> methods. If the type of the ValueState has a coder
registered, then Beam will automatically infer the coder for the state value.
Otherwise, a coder can be explicitly
@@ -2258,7 +2260,14 @@ accumulates the number of elements seen.</p><div
class=language-java><div class=
<span class=nd>@ProcessElement</span> <span class=kd>public</span> <span
class=kt>void</span> <span class=nf>process</span><span class=o>(</span><span
class=nd>@StateId</span><span class=o>(</span><span
class=s>"state"</span><span class=o>)</span> <span
class=n>ValueState</span><span class=o><</span><span
class=n>Integer</span><span class=o>></span> <span class=n>state</span><span
class=o>)</span> <span class=o>{</span>
<span class=n>state</span><span class=o>.</span><span
class=na>add</span><span class=o>(</span><span class=n>1</span><span
class=o>);</span>
<span class=o>}</span>
-<span class=o>}));</span></code></pre></div></div><h4
id=bagstate>BagState</h4><p>A common use case for state is to accumulate
multiple elements. <code>BagState</code> allows for accumulating an unordered
set
+<span class=o>}));</span></code></pre></div></div><div
class=language-python><div class=highlight><pre class=chroma><code
class=language-python data-lang=python><span class=k>class</span> <span
class=nc>CombiningStateDoFn</span><span class=p>(</span><span
class=n>DoFn</span><span class=p>):</span>
+ <span class=n>SUM_TOTAL</span> <span class=o>=</span> <span
class=n>CombiningValueStateSpec</span><span class=p>(</span><span
class=s1>'total'</span><span class=p>,</span> <span
class=nb>sum</span><span class=p>)</span>
+
+ <span class=k>def</span> <span class=nf>process</span><span
class=p>(</span><span class=bp>self</span><span class=p>,</span> <span
class=n>element</span><span class=p>,</span> <span class=n>state</span><span
class=o>=</span><span class=n>SoFn</span><span class=o>.</span><span
class=n>StateParam</span><span class=p>(</span><span
class=n>SUM_TOTAL</span><span class=p>)):</span>
+ <span class=n>state</span><span class=o>.</span><span
class=n>add</span><span class=p>(</span><span class=mi>1</span><span
class=p>)</span>
+
+<span class=n>_</span> <span class=o>=</span> <span class=p>(</span><span
class=n>p</span> <span class=o>|</span> <span class=s1>'Read per
user'</span> <span class=o>>></span> <span
class=n>ReadPerUser</span><span class=p>()</span>
+ <span class=o>|</span> <span class=s1>'Combine state
pardo'</span> <span class=o>>></span> <span class=n>beam</span><span
class=o>.</span><span class=n>ParDo</span><span class=p>(</span><span
class=n>CombiningStateDofn</span><span
class=p>()))</span></code></pre></div></div><h4 id=bagstate>BagState</h4><p>A
common use case for state is to accumulate multiple elements.
<code>BagState</code> allows for accumulating an unordered set
ofelements. This allows for addition of elements to the collection without
requiring the reading of the entire
collection first, which is an efficiency gain. In addition, runners that
support paged reads can allow individual
bags larger than available memory.</p><div class=language-java><div
class=highlight><pre class=chroma><code class=language-java
data-lang=java><span class=n>PCollection</span><span class=o><</span><span
class=n>KV</span><span class=o><</span><span class=n>String</span><span
class=o>,</span> <span class=n>ValueT</span><span class=o>>></span> <span
class=n>perUser</span> <span class=o>=</span> <span
class=n>readPerUser</span><span class=o>();</span>
@@ -2277,7 +2286,18 @@ bags larger than available memory.</p><div
class=language-java><div class=highli
<span class=n>state</span><span class=o>.</span><span
class=na>clear</span><span class=o>();</span> <span class=c1>// Clear the
state for this key.
</span><span class=c1></span> <span class=o>}</span>
<span class=o>}</span>
-<span class=o>}));</span></code></pre></div></div><h3
id=deferred-state-reads>11.2 Deferred state reads</h3><p>When a
<code>DoFn</code> contains multiple state specifications, reading each one in
order can be slow. Calling the <code>read()</code> function
+<span class=o>}));</span></code></pre></div></div><div
class=language-python><div class=highlight><pre class=chroma><code
class=language-python data-lang=python><span class=k>class</span> <span
class=nc>BagStateDoFn</span><span class=p>(</span><span
class=n>DoFn</span><span class=p>):</span>
+ <span class=n>ALL_ELEMENTS</span> <span class=o>=</span> <span
class=n>BagStateSpec</span><span class=p>(</span><span
class=s1>'buffer'</span><span class=p>,</span> <span
class=n>coders</span><span class=o>.</span><span
class=n>VarIntCoder</span><span class=p>())</span>
+
+ <span class=k>def</span> <span class=nf>process</span><span
class=p>(</span><span class=bp>self</span><span class=p>,</span> <span
class=n>element_pair</span><span class=p>,</span> <span
class=n>state</span><span class=o>=</span><span class=n>DoFn</span><span
class=o>.</span><span class=n>StateParam</span><span class=p>(</span><span
class=n>ALL_ELEMENTS</span><span class=p>)):</span>
+ <span class=n>state</span><span class=o>.</span><span
class=n>add</span><span class=p>(</span><span class=n>element_pair</span><span
class=p>[</span><span class=mi>1</span><span class=p>])</span>
+ <span class=k>if</span> <span class=n>should_fetch</span><span
class=p>():</span>
+ <span class=n>all_elements</span> <span class=o>=</span> <span
class=nb>list</span><span class=p>(</span><span class=n>state</span><span
class=o>.</span><span class=n>read</span><span class=p>())</span>
+ <span class=n>process_values</span><span class=p>(</span><span
class=n>all_elements</span><span class=p>)</span>
+ <span class=n>state</span><span class=o>.</span><span
class=n>clear</span><span class=p>()</span>
+
+<span class=n>_</span> <span class=o>=</span> <span class=p>(</span><span
class=n>p</span> <span class=o>|</span> <span class=s1>'Read per
user'</span> <span class=o>>></span> <span
class=n>ReadPerUser</span><span class=p>()</span>
+ <span class=o>|</span> <span class=s1>'Bag state pardo'</span>
<span class=o>>></span> <span class=n>beam</span><span
class=o>.</span><span class=n>ParDo</span><span class=p>(</span><span
class=n>BagStateDoFn</span><span
class=p>()))</span></code></pre></div></div><h3 id=deferred-state-reads>11.2
Deferred state reads</h3><p>When a <code>DoFn</code> contains multiple state
specifications, reading each one in order can be slow. Calling the
<code>read()</code> function
on a state can cause the runner to perform a blocking read. Performing
multiple blocking reads in sequence adds latency
to element processing. If you know that a state will always be read, you can
annotate it as @AlwaysFetched, and then the
runner can prefetch all of the states necessary. For example:</p><div
class=language-java><div class=highlight><pre class=chroma><code
class=language-java data-lang=java><span class=n>PCollection</span><span
class=o><</span><span class=n>KV</span><span class=o><</span><span
class=n>String</span><span class=o>,</span> <span class=n>ValueT</span><span
class=o>>></span> <span class=n>perUser</span> <span class=o>=</span>
<span class=n>readPerUser</span><span class=o>();</span>
@@ -2313,7 +2333,7 @@ be read in the future, allowing multiple state reads to
be batched together.</p>
<span class=o>}</span>
<span class=c1>// The runner can now batch all three states into a single
read, reducing latency.
-</span><span class=c1></span> <span class=n>processState1</span><span
class=o>(</span><span class=n>state1</span><span class=o>.</span><span
class=na>read</span><span class=o>());</span>
+</span><span class=c1></span> <span class=n>processState1</span><span
class=o>(</span><span class=n>state1</span><span class=o>.</span><span
class=na>read</span><span class=o>());</span>
<span class=n>processState2</span><span class=o>(</span><span
class=n>state2</span><span class=o>.</span><span class=na>read</span><span
class=o>());</span>
<span class=n>processState3</span><span class=o>(</span><span
class=n>state3</span><span class=o>.</span><span class=na>read</span><span
class=o>());</span>
<span class=o>}</span>
@@ -2340,10 +2360,28 @@ allows for event-time aggregations.</p><div
class=language-java><div class=highl
<span class=nd>@OnTimer</span><span class=o>(</span><span
class=s>"timer"</span><span class=o>)</span> <span
class=kd>public</span> <span class=kt>void</span> <span
class=nf>onTimer</span><span class=o>()</span> <span class=o>{</span>
<span class=c1>//Process timer.
</span><span class=c1></span> <span class=o>}</span>
-<span class=o>}));</span></code></pre></div></div><h4
id=processing-time-timers>11.3.2 Processing-time timers</h4><p>Processing-time
timers fire when the real wall-clock time passes. This is often used to create
larger batches of data
+<span class=o>}));</span></code></pre></div></div><div
class=language-python><div class=highlight><pre class=chroma><code
class=language-python data-lang=python><span class=k>class</span> <span
class=nc>EventTimerDoFn</span><span class=p>(</span><span
class=n>DoFn</span><span class=p>):</span>
+ <span class=n>ALL_ELEMENTS</span> <span class=o>=</span> <span
class=n>BagStateSpec</span><span class=p>(</span><span
class=s1>'buffer'</span><span class=p>,</span> <span
class=n>coders</span><span class=o>.</span><span
class=n>VarIntCoder</span><span class=p>())</span>
+ <span class=n>TIMER</span> <span class=o>=</span> <span
class=n>TimerSpec</span><span class=p>(</span><span
class=s1>'timer'</span><span class=p>,</span> <span
class=n>TimeDomain</span><span class=o>.</span><span
class=n>WATERMARK</span><span class=p>)</span>
+
+ <span class=k>def</span> <span class=nf>process</span><span
class=p>(</span><span class=bp>self</span><span class=p>,</span>
+ <span class=n>element_pair</span><span class=p>,</span>
+ <span class=n>t</span> <span class=o>=</span> <span
class=n>DoFn</span><span class=o>.</span><span
class=n>TimestampParam</span><span class=p>,</span>
+ <span class=nb>buffer</span> <span class=o>=</span> <span
class=n>DoFn</span><span class=o>.</span><span class=n>StateParam</span><span
class=p>(</span><span class=n>ALL_ELEMENTS</span><span class=p>),</span>
+ <span class=n>timer</span> <span class=o>=</span> <span
class=n>DoFn</span><span class=o>.</span><span class=n>TimerParam</span><span
class=p>(</span><span class=n>TIMER</span><span class=p>)):</span>
+ <span class=nb>buffer</span><span class=o>.</span><span
class=n>add</span><span class=p>(</span><span class=n>element_pair</span><span
class=p>[</span><span class=mi>1</span><span class=p>])</span>
+ <span class=c1># Set an event-time timer to the element timestamp.</span>
+ <span class=n>timer</span><span class=o>.</span><span
class=n>set</span><span class=p>(</span><span class=n>t</span><span
class=p>)</span>
+
+ <span class=nd>@on_timer</span><span class=p>(</span><span
class=n>TIMER</span><span class=p>)</span>
+ <span class=k>def</span> <span class=nf>expiry_callback</span><span
class=p>(</span><span class=bp>self</span><span class=p>,</span> <span
class=nb>buffer</span> <span class=o>=</span> <span class=n>DoFn</span><span
class=o>.</span><span class=n>StateParam</span><span class=p>(</span><span
class=n>ALL_ELEMENTS</span><span class=p>)):</span>
+ <span class=n>state</span><span class=o>.</span><span
class=n>clear</span><span class=p>()</span>
+
+<span class=n>_</span> <span class=o>=</span> <span class=p>(</span><span
class=n>p</span> <span class=o>|</span> <span class=s1>'Read per
user'</span> <span class=o>>></span> <span
class=n>ReadPerUser</span><span class=p>()</span>
+ <span class=o>|</span> <span class=s1>'EventTime timer
pardo'</span> <span class=o>>></span> <span class=n>beam</span><span
class=o>.</span><span class=n>ParDo</span><span class=p>(</span><span
class=n>EventTimerDoFn</span><span
class=p>()))</span></code></pre></div></div><h4
id=processing-time-timers>11.3.2 Processing-time timers</h4><p>Processing-time
timers fire when the real wall-clock time passes. This is often used to create
larger batches of data
before processing. It can also be used to schedule events that should occur at
a specific time. Just like with
event-time timers, processing-time timers are per key - each key has a
separate copy of the timer.</p><p>While processing-time timers can be set to an
absolute timestamp, it is very common to set them to an offset relative
-to the current time. The <code>Timer.offset</code> and
<code>Timer.setRelative</code> methods can be used to accomplish this.</p><div
class=language-java><div class=highlight><pre class=chroma><code
class=language-java data-lang=java><span class=n>PCollection</span><span
class=o><</span><span class=n>KV</span><span class=o><</span><span
class=n>String</span><span class=o>,</span> <span class=n>ValueT</span><span
class=o>>></span> <span class=n>perUser</span> <span class=o>=</ [...]
+to the current time. In Java, the <code>Timer.offset</code> and
<code>Timer.setRelative</code> methods can be used to accomplish this.</p><div
class=language-java><div class=highlight><pre class=chroma><code
class=language-java data-lang=java><span class=n>PCollection</span><span
class=o><</span><span class=n>KV</span><span class=o><</span><span
class=n>String</span><span class=o>,</span> <span class=n>ValueT</span><span
class=o>>></span> <span class=n>perUser</span> <span cl [...]
<span class=n>perUser</span><span class=o>.</span><span
class=na>apply</span><span class=o>(</span><span class=n>ParDo</span><span
class=o>.</span><span class=na>of</span><span class=o>(</span><span
class=k>new</span> <span class=n>DoFn</span><span class=o><</span><span
class=n>KV</span><span class=o><</span><span class=n>String</span><span
class=o>,</span> <span class=n>ValueT</span><span class=o>>,</span> <span
class=n>OutputT</span><span class=o>>()</span> <span class=o>{</span>
<span class=nd>@TimerId</span><span class=o>(</span><span
class=s>"timer"</span><span class=o>)</span> <span
class=kd>private</span> <span class=kd>final</span> <span
class=n>TimerSpec</span> <span class=n>timer</span> <span class=o>=</span>
<span class=n>TimerSpecs</span><span class=o>.</span><span
class=na>timer</span><span class=o>(</span><span class=n>TimeDomain</span><span
class=o>.</span><span class=na>PROCESSING_TIME</span><span class=o>);</span>
@@ -2356,7 +2394,25 @@ to the current time. The <code>Timer.offset</code> and
<code>Timer.setRelative</
<span class=nd>@OnTimer</span><span class=o>(</span><span
class=s>"timer"</span><span class=o>)</span> <span
class=kd>public</span> <span class=kt>void</span> <span
class=nf>onTimer</span><span class=o>()</span> <span class=o>{</span>
<span class=c1>//Process timer.
</span><span class=c1></span> <span class=o>}</span>
-<span class=o>}));</span></code></pre></div></div><h4
id=dynamic-timer-tags>11.3.3 Dynamic timer tags</h4><p>Beam also supports
dynamically setting a timer tag using <code>TimerMap</code>. This allows for
setting multiple different timers
+<span class=o>}));</span></code></pre></div></div><div
class=language-python><div class=highlight><pre class=chroma><code
class=language-python data-lang=python><span class=k>class</span> <span
class=nc>ProcessingTimerDoFn</span><span class=p>(</span><span
class=n>DoFn</span><span class=p>):</span>
+ <span class=n>ALL_ELEMENTS</span> <span class=o>=</span> <span
class=n>BagStateSpec</span><span class=p>(</span><span
class=s1>'buffer'</span><span class=p>,</span> <span
class=n>coders</span><span class=o>.</span><span
class=n>VarIntCoder</span><span class=p>())</span>
+ <span class=n>TIMER</span> <span class=o>=</span> <span
class=n>TimerSpec</span><span class=p>(</span><span
class=s1>'timer'</span><span class=p>,</span> <span
class=n>TimeDomain</span><span class=o>.</span><span
class=n>REAL_TIME</span><span class=p>)</span>
+
+ <span class=k>def</span> <span class=nf>process</span><span
class=p>(</span><span class=bp>self</span><span class=p>,</span>
+ <span class=n>element_pair</span><span class=p>,</span>
+ <span class=nb>buffer</span> <span class=o>=</span> <span
class=n>DoFn</span><span class=o>.</span><span class=n>StateParam</span><span
class=p>(</span><span class=n>ALL_ELEMENTS</span><span class=p>),</span>
+ <span class=n>timer</span> <span class=o>=</span> <span
class=n>DoFn</span><span class=o>.</span><span class=n>TimerParam</span><span
class=p>(</span><span class=n>TIMER</span><span class=p>)):</span>
+ <span class=nb>buffer</span><span class=o>.</span><span
class=n>add</span><span class=p>(</span><span class=n>element_pair</span><span
class=p>[</span><span class=mi>1</span><span class=p>])</span>
+ <span class=c1># Set a timer to go off 30 seconds in the future.</span>
+ <span class=n>timer</span><span class=o>.</span><span
class=n>set</span><span class=p>(</span><span class=n>Timestamp</span><span
class=o>.</span><span class=n>now</span><span class=p>()</span> <span
class=o>+</span> <span class=n>Duration</span><span class=p>(</span><span
class=n>seconds</span><span class=o>=</span><span class=mi>30</span><span
class=p>))</span>
+
+ <span class=nd>@on_timer</span><span class=p>(</span><span
class=n>TIMER</span><span class=p>)</span>
+ <span class=k>def</span> <span class=nf>expiry_callback</span><span
class=p>(</span><span class=bp>self</span><span class=p>,</span> <span
class=nb>buffer</span> <span class=o>=</span> <span class=n>DoFn</span><span
class=o>.</span><span class=n>StateParam</span><span class=p>(</span><span
class=n>ALL_ELEMENTS</span><span class=p>)):</span>
+ <span class=c1># Process timer.</span>
+ <span class=n>state</span><span class=o>.</span><span
class=n>clear</span><span class=p>()</span>
+
+<span class=n>_</span> <span class=o>=</span> <span class=p>(</span><span
class=n>p</span> <span class=o>|</span> <span class=s1>'Read per
user'</span> <span class=o>>></span> <span
class=n>ReadPerUser</span><span class=p>()</span>
+ <span class=o>|</span> <span class=s1>'ProcessingTime timer
pardo'</span> <span class=o>>></span> <span class=n>beam</span><span
class=o>.</span><span class=n>ParDo</span><span class=p>(</span><span
class=n>ProcessingTimerDoFn</span><span
class=p>()))</span></code></pre></div></div><h4 id=dynamic-timer-tags>11.3.3
Dynamic timer tags</h4><p>Beam also supports dynamically setting a timer tag
using <code>TimerMap</code>. This allows for setting multiple different timers
in a <code>DoFn</code> and allowing for the timer tags to be dynamically
chosen - e.g. based on data in the input elements. A
timer with a specific tag can only be set to a single timestamp, so setting
the timer again has the effect of
overwriting the previous expiration time for the timer with that tag. Each
<code>TimerMap</code> is identified with a timer family
@@ -2375,7 +2431,7 @@ id, and timers in different timer families are
independent.</p><div class=langua
<span class=nd>@OnTimerFamily</span><span class=o>(</span><span
class=s>"actionTimers"</span><span class=o>)</span> <span
class=kd>public</span> <span class=kt>void</span> <span
class=nf>onTimer</span><span class=o>(</span><span class=nd>@TimerId</span>
<span class=n>String</span> <span class=n>timerId</span><span class=o>)</span>
<span class=o>{</span>
<span class=n>LOG</span><span class=o>.</span><span
class=na>info</span><span class=o>(</span><span class=s>"Timer fired with
id "</span> <span class=o>+</span> <span class=n>timerId</span><span
class=o>);</span>
<span class=o>}</span>
-<span class=o>}));</span></code></pre></div></div><h4
id=timer-output-timestamps>11.3.4 Timer output timestamps</h4><p>By default,
event-time timers will hold the output watermark of the <code>ParDo</code> to
the timestamp of the timer. This means
+<span class=o>}));</span></code></pre></div></div><div
class=language-python><div class=highlight><pre class=chroma><code
class=language-python data-lang=python><span class=n>To</span> <span
class=n>be</span> <span class=n>supported</span><span class=p>,</span> <span
class=n>See</span> <span class=n>BEAM</span><span class=o>-</span><span
class=mi>9602</span></code></pre></div></div><h4
id=timer-output-timestamps>11.3.4 Timer output timestamps</h4><p>By default,
event-time timers will hol [...]
that if a timer is set to 12pm, any windowed aggregations or event-time timers
later in the pipeline graph that finish<br>after 12pm will not expire. The
timestamp of the timer is also the default output timestamp for the timer
callback. This
means that any elements output from the onTimer method will have a timestamp
equal to the timestamp of the timer firing.
For processing-time timers, the default output timestamp and watermark hold is
the value of the input watermark at the
@@ -2465,7 +2521,7 @@ past the timestamp of the minimum element. The following
code demonstrates this.
performance. There are two common strategies for garbage collecting
state.</p><h5 id=using-windows-for-garbage-collection>11.4.1 <strong>Using
windows for garbage collection</strong></h5><p>All state and timers for a key
is scoped to the window it is in. This means that depending on the timestamp of
the
input element the ParDo will see different values for the state depending on
the window that element falls into. In
addition, once the input watermark passes the end of the window, the runner
should garbage collect all state for that
-window. (note: if allowed lateness is set to a positive value for the window,
the runner must wait for the watemark to
+window. (note: if allowed lateness is set to a positive value for the window,
the runner must wait for the watermark to
pass the end of the window plus the allowed lateness before garbage collecting
state). This can be used as a
garbage-collection strategy.</p><p>For example, given the following:</p><div
class=language-java><div class=highlight><pre class=chroma><code
class=language-java data-lang=java><span class=n>PCollection</span><span
class=o><</span><span class=n>KV</span><span class=o><</span><span
class=n>String</span><span class=o>,</span> <span class=n>ValueT</span><span
class=o>>></span> <span class=n>perUser</span> <span class=o>=</span>
<span class=n>readPerUser</span><span class=o>();</span>
<span class=n>perUser</span><span class=o>.</span><span
class=na>apply</span><span class=o>(</span><span class=n>Window</span><span
class=o>.</span><span class=na>into</span><span class=o>(</span><span
class=n>CalendarWindows</span><span class=o>.</span><span
class=na>days</span><span class=o>(</span><span class=n>1</span><span
class=o>)</span>
@@ -2477,7 +2533,17 @@ garbage-collection strategy.</p><p>For example, given
the following:</p><div cla
<span class=c1>// The state is scoped to a calendar day window.
That means that if the input timestamp ts is after
</span><span class=c1></span> <span class=c1>// midnight PST,
then a new copy of the state will be seen for the next day.
</span><span class=c1></span> <span class=o>}</span>
- <span class=o>}));</span></code></pre></div></div><p>This
<code>ParDo</code> stores state per day. Once the pipeline is done processing
data for a given day, all the state for that
+ <span class=o>}));</span></code></pre></div></div><div
class=language-python><div class=highlight><pre class=chroma><code
class=language-python data-lang=python><span class=k>class</span> <span
class=nc>StateDoFn</span><span class=p>(</span><span class=n>DoFn</span><span
class=p>):</span>
+ <span class=n>ALL_ELEMENTS</span> <span class=o>=</span> <span
class=n>BagStateSpec</span><span class=p>(</span><span
class=s1>'buffer'</span><span class=p>,</span> <span
class=n>coders</span><span class=o>.</span><span
class=n>VarIntCoder</span><span class=p>())</span>
+
+ <span class=k>def</span> <span class=nf>process</span><span
class=p>(</span><span class=bp>self</span><span class=p>,</span>
+ <span class=n>element_pair</span><span class=p>,</span>
+ <span class=nb>buffer</span> <span class=o>=</span> <span
class=n>DoFn</span><span class=o>.</span><span class=n>StateParam</span><span
class=p>(</span><span class=n>ALL_ELEMENTS</span><span class=p>)):</span>
+ <span class=o>...</span>
+
+<span class=n>_</span> <span class=o>=</span> <span class=p>(</span><span
class=n>p</span> <span class=o>|</span> <span class=s1>'Read per
user'</span> <span class=o>>></span> <span
class=n>ReadPerUser</span><span class=p>()</span>
+ <span class=o>|</span> <span class=s1>'Windowing'</span> <span
class=o>>></span> <span class=n>beam</span><span class=o>.</span><span
class=n>WindowInto</span><span class=p>(</span><span
class=n>FixedWindows</span><span class=p>(</span><span class=mi>60</span> <span
class=o>*</span> <span class=mi>60</span> <span class=o>*</span> <span
class=mi>24</span><span class=p>))</span>
+ <span class=o>|</span> <span class=s1>'DoFn'</span> <span
class=o>>></span> <span class=n>beam</span><span class=o>.</span><span
class=n>ParDo</span><span class=p>(</span><span class=n>StateDoFn</span><span
class=p>()))</span></code></pre></div></div><p>This <code>ParDo</code> stores
state per day. Once the pipeline is done processing data for a given day, all
the state for that
day is garbage collected.</p><h5 id=using-timers-for-garbage-collection>11.4.1
<strong>Using timers For garbage collection</strong></h5><p>In some cases, it
is difficult to find a windowing strategy that models the desired
garbage-collection strategy. For
example, a common desire is to garbage collect state for a key once no
activity has been seen on the key for some time.
This can be done by updating a timer that garbage collects state. For
example</p><div class=language-java><div class=highlight><pre
class=chroma><code class=language-java data-lang=java><span
class=n>PCollection</span><span class=o><</span><span class=n>KV</span><span
class=o><</span><span class=n>String</span><span class=o>,</span> <span
class=n>ValueT</span><span class=o>>></span> <span class=n>perUser</span>
<span class=o>=</span> <span class=n>readPerUser</span><span clas [...]
@@ -2514,7 +2580,36 @@ This can be done by updating a timer that garbage
collects state. For example</p
</span><span class=c1></span> <span class=n>state</span><span
class=o>.</span><span class=na>clear</span><span class=o>();</span>
<span class=n>maxTimestamp</span><span class=o>.</span><span
class=na>clear</span><span class=o>();</span>
<span class=o>}</span>
- <span class=o>}</span></code></pre></div></div><h3
id=state-timers-examples>11.5 State and timers examples</h3><p>Following are
some example uses of state and timers</p><h4
id=joining-clicks-and-views>11.5.1. Joining clicks and views</h4><p>In this
example, the pipeline is processing data from an e-commerce site’s home
page. There are two input streams:
+ <span class=o>}</span></code></pre></div></div><div
class=language-python><div class=highlight><pre class=chroma><code
class=language-python data-lang=python><span class=k>class</span> <span
class=nc>UserDoFn</span><span class=p>(</span><span class=n>DoFn</span><span
class=p>):</span>
+ <span class=n>ALL_ELEMENTS</span> <span class=o>=</span> <span
class=n>BagStateSpec</span><span class=p>(</span><span
class=s1>'state'</span><span class=p>,</span> <span
class=n>coders</span><span class=o>.</span><span
class=n>VarIntCoder</span><span class=p>())</span>
+ <span class=n>MAX_TIMESTAMP</span> <span class=o>=</span> <span
class=n>CombiningValueStateSpec</span><span class=p>(</span><span
class=s1>'max_timestamp_seen'</span><span class=p>,</span> <span
class=nb>max</span><span class=p>)</span>
+ <span class=n>TIMER</span> <span class=o>=</span> <span
class=n>TimerSpec</span><span class=p>(</span><span
class=s1>'gc-timer'</span><span class=p>,</span> <span
class=n>TimeDomain</span><span class=o>.</span><span
class=n>WATERMARK</span><span class=p>)</span>
+
+ <span class=k>def</span> <span class=nf>process</span><span
class=p>(</span><span class=bp>self</span><span class=p>,</span>
+ <span class=n>element</span><span class=p>,</span>
+ <span class=n>t</span> <span class=o>=</span> <span
class=n>DoFn</span><span class=o>.</span><span
class=n>TimestampParam</span><span class=p>,</span>
+ <span class=n>state</span> <span class=o>=</span> <span
class=n>DoFn</span><span class=o>.</span><span class=n>StateParam</span><span
class=p>(</span><span class=n>ALL_ELEMENTS</span><span class=p>),</span>
+ <span class=n>max_timestamp</span> <span class=o>=</span> <span
class=n>DoFn</span><span class=o>.</span><span class=n>StateParam</span><span
class=p>(</span><span class=n>MAX_TIMESTAMP</span><span class=p>),</span>
+ <span class=n>timer</span> <span class=o>=</span> <span
class=n>DoFn</span><span class=o>.</span><span class=n>TimerParam</span><span
class=p>(</span><span class=n>TIMER</span><span class=p>)):</span>
+ <span class=n>update_state</span><span class=p>(</span><span
class=n>state</span><span class=p>,</span> <span class=n>element</span><span
class=p>)</span>
+ <span class=n>max_timestamp</span><span class=o>.</span><span
class=n>add</span><span class=p>(</span><span class=n>t</span><span
class=o>.</span><span class=n>micros</span><span class=p>)</span>
+
+ <span class=c1># Set the timer to be one hour after the maximum timestamp
seen. This will keep overwriting the same timer, so </span>
+ <span class=c1># as long as there is activity on this key the state will
stay active. Once the key goes inactive for one hour's</span>
+ <span class=c1># worth of event time (as measured by the watermark), then
the gc timer will fire.</span>
+ <span class=n>expiration_time</span> <span class=o>=</span> <span
class=n>Timestamp</span><span class=p>(</span><span class=n>micros</span><span
class=o>=</span><span class=n>max_timestamp</span><span class=o>.</span><span
class=n>read</span><span class=p>())</span> <span class=o>+</span> <span
class=n>Duration</span><span class=p>(</span><span class=n>seconds</span><span
class=o>=</span><span class=mi>60</span><span class=o>*</span><span
class=mi>60</span><span class=p>)</span>
+ <span class=n>timer</span><span class=o>.</span><span
class=n>set</span><span class=p>(</span><span
class=n>expiration_time</span><span class=p>)</span>
+
+ <span class=nd>@on_timer</span><span class=p>(</span><span
class=n>TIMER</span><span class=p>)</span>
+ <span class=k>def</span> <span class=nf>expiry_callback</span><span
class=p>(</span><span class=bp>self</span><span class=p>,</span>
+ <span class=n>state</span> <span class=o>=</span> <span
class=n>DoFn</span><span class=o>.</span><span class=n>StateParam</span><span
class=p>(</span><span class=n>ALL_ELEMENTS</span><span class=p>),</span>
+ <span class=n>max_timestamp</span> <span
class=o>=</span> <span class=n>DoFn</span><span class=o>.</span><span
class=n>StateParam</span><span class=p>(</span><span
class=n>MAX_TIMESTAMP</span><span class=p>)):</span>
+ <span class=n>state</span><span class=o>.</span><span
class=n>clear</span><span class=p>()</span>
+ <span class=n>max_timestamp</span><span class=o>.</span><span
class=n>clear</span><span class=p>()</span>
+
+
+<span class=n>_</span> <span class=o>=</span> <span class=p>(</span><span
class=n>p</span> <span class=o>|</span> <span class=s1>'Read per
user'</span> <span class=o>>></span> <span
class=n>ReadPerUser</span><span class=p>()</span>
+ <span class=o>|</span> <span class=s1>'User DoFn'</span> <span
class=o>>></span> <span class=n>beam</span><span class=o>.</span><span
class=n>ParDo</span><span class=p>(</span><span class=n>UserDoFn</span><span
class=p>()))</span></code></pre></div></div><h3 id=state-timers-examples>11.5
State and timers examples</h3><p>Following are some example uses of state and
timers</p><h4 id=joining-clicks-and-views>11.5.1. Joining clicks and
views</h4><p>In this example, the p [...]
a stream of views, representing suggested product links displayed to the user
on the home page, and a stream of
clicks, representing actual user clicks on these links. The goal of the
pipeline is to join click events with view
events, outputting a new joined event that contains information from both
events. Each link has a unique identifier
diff --git a/website/generated-content/get-started/index.xml
b/website/generated-content/get-started/index.xml
index b2b78eb..18c0423 100644
--- a/website/generated-content/get-started/index.xml
+++ b/website/generated-content/get-started/index.xml
@@ -1399,7 +1399,7 @@ environment&rsquo;s directories.</p>
<pre><code>PS&gt; python -m pip install
apache-beam</code></pre>
</div>
<h4 id="extra-requirements">Extra requirements</h4>
-<p>The above installation will not install all the extra dependencies for
using features like the Google Cloud Dataflow runner. Information on what extra
packages are required for different features are highlighted below. It is
possible to install multitple extra requirements using something like
<code>pip install apache-beam[feature1,feature2]</code>.</p>
+<p>The above installation will not install all the extra dependencies for
using features like the Google Cloud Dataflow runner. Information on what extra
packages are required for different features are highlighted below. It is
possible to install multiple extra requirements using something like
<code>pip install apache-beam[feature1,feature2]</code>.</p>
<ul>
<li><strong>Google Cloud Platform</strong>
<ul>
@@ -1414,6 +1414,12 @@ environment&rsquo;s directories.</p>
</li>
</ul>
</li>
+<li><strong>Amazon Web Services</strong>
+<ul>
+<li>Installation Command: <code>pip install
apache-beam[aws]</code></li>
+<li>Required for I/O connectors interfacing with AWS</li>
+</ul>
+</li>
<li><strong>Tests</strong>
<ul>
<li>Installation Command: <code>pip install
apache-beam[test]</code></li>
diff --git a/website/generated-content/get-started/quickstart-py/index.html
b/website/generated-content/get-started/quickstart-py/index.html
index 855b217..f26cc7f 100644
--- a/website/generated-content/get-started/quickstart-py/index.html
+++ b/website/generated-content/get-started/quickstart-py/index.html
@@ -9,7 +9,7 @@ administrative privileges.</p><div
class=shell-unix><pre><code>pip install --upg
<code>setuptools</code> is installed on your machine. If you do not have
<code>setuptools</code>
version 17.1 or newer, run the following command to install it.</p><div
class=shell-unix><pre><code>pip install --upgrade
setuptools</code></pre></div><div class=shell-PowerShell><pre><code>PS>
python -m pip install --upgrade setuptools</code></pre></div><h2
id=get-apache-beam>Get Apache Beam</h2><h3
id=create-and-activate-a-virtual-environment>Create and activate a virtual
environment</h3><p>A virtual environment is a directory tree containing its own
Python distribution. To create a [...]
Activating it sets some environment variables that point to the virtual
-environment’s directories.</p><p>To activate a virtual environment in
Bash, run:</p><div class=shell-unix><pre><code>.
/path/to/directory/bin/activate</code></pre></div><div
class=shell-PowerShell><pre><code>PS>
C:\path\to\directory\Scripts\activate.ps1</code></pre></div><p>That is, execute
the <code>activate</code> script under the virtual environment directory you
created.</p><p>For instructions using other shells, see the <a
href=https://virtualenv.pypa.io/en/stable/userguide [...]
+environment’s directories.</p><p>To activate a virtual environment in
Bash, run:</p><div class=shell-unix><pre><code>.
/path/to/directory/bin/activate</code></pre></div><div
class=shell-PowerShell><pre><code>PS>
C:\path\to\directory\Scripts\activate.ps1</code></pre></div><p>That is, execute
the <code>activate</code> script under the virtual environment directory you
created.</p><p>For instructions using other shells, see the <a
href=https://virtualenv.pypa.io/en/stable/userguide [...]
See https://beam.apache.org/roadmap/portability/#python-on-flink for more
information.</code></pre></div><div
class=runner-flink-cluster><pre><code>Currently, running wordcount.py on Flink
requires a full download of the Beam source code.
See https://beam.apache.org/documentation/runners/flink/ for more
information.</code></pre></div><div class=runner-spark><pre><code>Currently,
running wordcount.py on Spark requires a full download of the Beam source code.
See https://beam.apache.org/roadmap/portability/#python-on-spark for more
information.</code></pre></div><div class=runner-dataflow><pre><code># As part
of the initial setup, install Google Cloud Platform specific extra components.
Make sure you
diff --git a/website/generated-content/sitemap.xml
b/website/generated-content/sitemap.xml
index c71c97b..aafbf27 100644
--- a/website/generated-content/sitemap.xml
+++ b/website/generated-content/sitemap.xml
@@ -1 +1 @@
-<?xml version="1.0" encoding="utf-8" standalone="yes"?><urlset
xmlns="http://www.sitemaps.org/schemas/sitemap/0.9"
xmlns:xhtml="http://www.w3.org/1999/xhtml"><url><loc>/categories/blog/</loc><lastmod>2020-06-01T15:15:07-04:00</lastmod></url><url><loc>/blog/</loc><lastmod>2020-06-01T15:15:07-04:00</lastmod></url><url><loc>/categories/</loc><lastmod>2020-06-01T15:15:07-04:00</lastmod></url><url><loc>/categories/python/</loc><lastmod>2020-05-28T15:14:36-07:00</lastmod></url><url><loc>/blog/
[...]
\ No newline at end of file
+<?xml version="1.0" encoding="utf-8" standalone="yes"?><urlset
xmlns="http://www.sitemaps.org/schemas/sitemap/0.9"
xmlns:xhtml="http://www.w3.org/1999/xhtml"><url><loc>/categories/blog/</loc><lastmod>2020-06-01T15:15:07-04:00</lastmod></url><url><loc>/blog/</loc><lastmod>2020-06-01T15:15:07-04:00</lastmod></url><url><loc>/categories/</loc><lastmod>2020-06-01T15:15:07-04:00</lastmod></url><url><loc>/categories/python/</loc><lastmod>2020-05-28T15:14:36-07:00</lastmod></url><url><loc>/blog/
[...]
\ No newline at end of file