This is an automated email from the ASF dual-hosted git repository.
git-site-role pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/asf-site by this push:
new 019240c Publishing website 2019/05/08 00:01:48 at commit 6d9214f
019240c is described below
commit 019240cd5e7fb658ac19903e27cfaafe1b3170bd
Author: jenkins <[email protected]>
AuthorDate: Wed May 8 00:01:48 2019 +0000
Publishing website 2019/05/08 00:01:48 at commit 6d9214f
---
.../blog/2017/08/28/timely-processing.html | 102 +++++++++++++++++++--
1 file changed, 92 insertions(+), 10 deletions(-)
diff --git a/website/generated-content/blog/2017/08/28/timely-processing.html
b/website/generated-content/blog/2017/08/28/timely-processing.html
index 1466e86..5c25788 100644
--- a/website/generated-content/blog/2017/08/28/timely-processing.html
+++ b/website/generated-content/blog/2017/08/28/timely-processing.html
@@ -346,8 +346,13 @@ elements we have buffered. Here are the state cells in
code:</p>
</code></pre>
</div>
-<div class="language-py highlighter-rouge"><pre class="highlight"><code><span
class="c"># State and timers are not yet supported in Beam's Python SDK.</span>
-<span class="c"># Follow https://issues.apache.org/jira/browse/BEAM-2687 for
updates.</span>
+<div class="language-py highlighter-rouge"><pre class="highlight"><code><span
class="k">class</span> <span class="nc">StatefulBufferingFn</span><span
class="p">(</span><span class="n">beam</span><span class="o">.</span><span
class="n">DoFn</span><span class="p">):</span>
+
+ <span class="n">BUFFER_STATE</span> <span class="o">=</span> <span
class="n">BagStateSpec</span><span class="p">(</span><span
class="s">'buffer'</span><span class="p">,</span> <span
class="n">EventCoder</span><span class="p">())</span>
+
+ <span class="n">COUNT_STATE</span> <span class="o">=</span> <span
class="n">CombiningValueStateSpec</span><span class="p">(</span><span
class="s">'count'</span><span class="p">,</span>
+ <span
class="n">VarIntCoder</span><span class="p">(),</span>
+ <span class="n">combiners</span><span
class="o">.</span><span class="n">SumCombineFn</span><span class="p">())</span>
</code></pre>
</div>
@@ -384,7 +389,7 @@ events, and output.</p>
<span class="n">countState</span><span class="o">.</span><span
class="na">write</span><span class="o">(</span><span
class="n">count</span><span class="o">);</span>
<span class="n">bufferState</span><span class="o">.</span><span
class="na">add</span><span class="o">(</span><span
class="n">context</span><span class="o">.</span><span
class="na">element</span><span class="o">());</span>
- <span class="k">if</span> <span class="o">(</span><span
class="n">count</span> <span class="o">></span> <span
class="n">MAX_BUFFER_SIZE</span><span class="o">)</span> <span
class="o">{</span>
+ <span class="k">if</span> <span class="o">(</span><span
class="n">count</span> <span class="o">>=</span> <span
class="n">MAX_BUFFER_SIZE</span><span class="o">)</span> <span
class="o">{</span>
<span class="k">for</span> <span class="o">(</span><span
class="n">EnrichedEvent</span> <span class="n">enrichedEvent</span> <span
class="o">:</span> <span class="n">enrichEvents</span><span
class="o">(</span><span class="n">bufferState</span><span
class="o">.</span><span class="na">read</span><span class="o">()))</span> <span
class="o">{</span>
<span class="n">context</span><span class="o">.</span><span
class="na">output</span><span class="o">(</span><span
class="n">enrichedEvent</span><span class="o">);</span>
<span class="o">}</span>
@@ -398,8 +403,30 @@ events, and output.</p>
</code></pre>
</div>
-<div class="language-py highlighter-rouge"><pre class="highlight"><code><span
class="c"># State and timers are not yet supported in Beam's Python SDK.</span>
-<span class="c"># Follow https://issues.apache.org/jira/browse/BEAM-2687 for
updates.</span>
+<div class="language-py highlighter-rouge"><pre class="highlight"><code><span
class="k">class</span> <span class="nc">StatefulBufferingFn</span><span
class="p">(</span><span class="n">beam</span><span class="o">.</span><span
class="n">DoFn</span><span class="p">):</span>
+
+ <span class="n">MAX_BUFFER_SIZE</span> <span class="o">=</span> <span
class="mi">500</span><span class="p">;</span>
+
+ <span class="n">BUFFER_STATE</span> <span class="o">=</span> <span
class="n">BagStateSpec</span><span class="p">(</span><span
class="s">'buffer'</span><span class="p">,</span> <span
class="n">EventCoder</span><span class="p">())</span>
+
+ <span class="n">COUNT_STATE</span> <span class="o">=</span> <span
class="n">CombiningValueStateSpec</span><span class="p">(</span><span
class="s">'count'</span><span class="p">,</span>
+ <span
class="n">VarIntCoder</span><span class="p">(),</span>
+ <span class="n">combiners</span><span
class="o">.</span><span class="n">SumCombineFn</span><span class="p">())</span>
+
+ <span class="k">def</span> <span class="nf">process</span><span
class="p">(</span><span class="bp">self</span><span class="p">,</span> <span
class="n">element</span><span class="p">,</span>
+ <span class="n">buffer_state</span><span class="o">=</span><span
class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span
class="o">.</span><span class="n">StateParam</span><span
class="p">(</span><span class="n">BUFFER_STATE</span><span class="p">),</span>
+ <span class="n">count_state</span><span class="o">=</span><span
class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span
class="o">.</span><span class="n">StateParam</span><span
class="p">(</span><span class="n">COUNT_STATE</span><span class="p">)):</span>
+
+ <span class="n">buffer_state</span><span class="o">.</span><span
class="n">add</span><span class="p">(</span><span class="n">element</span><span
class="p">)</span>
+
+ <span class="n">count_state</span><span class="o">.</span><span
class="n">add</span><span class="p">(</span><span class="mi">1</span><span
class="p">)</span>
+ <span class="n">count</span> <span class="o">=</span> <span
class="n">count_state</span><span class="o">.</span><span
class="n">read</span><span class="p">()</span>
+
+ <span class="k">if</span> <span class="n">count</span> <span
class="o">>=</span> <span class="n">MAX_BUFFER_SIZE</span><span
class="p">:</span>
+ <span class="k">for</span> <span class="n">event</span> <span
class="ow">in</span> <span class="n">buffer_state</span><span
class="o">.</span><span class="n">read</span><span class="p">():</span>
+ <span class="k">yield</span> <span class="n">event</span>
+ <span class="n">count_state</span><span class="o">.</span><span
class="n">clear</span><span class="p">()</span>
+ <span class="n">buffer_state</span><span class="o">.</span><span
class="n">clear</span><span class="p">()</span>
</code></pre>
</div>
@@ -469,14 +496,39 @@ any events remaining in the buffer are processed.</p>
<span class="k">for</span> <span class="o">(</span><span
class="n">EnrichedEvent</span> <span class="n">enrichedEvent</span> <span
class="o">:</span> <span class="n">enrichEvents</span><span
class="o">(</span><span class="n">bufferState</span><span
class="o">.</span><span class="na">read</span><span class="o">()))</span> <span
class="o">{</span>
<span class="n">context</span><span class="o">.</span><span
class="na">output</span><span class="o">(</span><span
class="n">enrichedEvent</span><span class="o">);</span>
<span class="o">}</span>
+ <span class="n">bufferState</span><span class="o">.</span><span
class="na">clear</span><span class="o">();</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="o">}</span>
</code></pre>
</div>
-<div class="language-py highlighter-rouge"><pre class="highlight"><code><span
class="c"># State and timers are not yet supported in Beam's Python SDK.</span>
-<span class="c"># Follow https://issues.apache.org/jira/browse/BEAM-2687 for
updates.</span>
+<div class="language-py highlighter-rouge"><pre class="highlight"><code><span
class="k">class</span> <span class="nc">StatefulBufferingFn</span><span
class="p">(</span><span class="n">beam</span><span class="o">.</span><span
class="n">DoFn</span><span class="p">):</span>
+ <span class="err">…</span>
+
+ <span class="n">EXPIRY_TIMER</span> <span class="o">=</span> <span
class="n">TimerSpec</span><span class="p">(</span><span
class="s">'expiry'</span><span class="p">,</span> <span
class="n">TimeDomain</span><span class="o">.</span><span
class="n">WATERMARK</span><span class="p">)</span>
+
+ <span class="k">def</span> <span class="nf">process</span><span
class="p">(</span><span class="bp">self</span><span class="p">,</span> <span
class="n">element</span><span class="p">,</span>
+ <span class="n">w</span><span class="o">=</span><span
class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span
class="o">.</span><span class="n">WindowParam</span><span class="p">,</span>
+ <span class="n">buffer_state</span><span class="o">=</span><span
class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span
class="o">.</span><span class="n">StateParam</span><span
class="p">(</span><span class="n">BUFFER_STATE</span><span class="p">),</span>
+ <span class="n">count_state</span><span class="o">=</span><span
class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span
class="o">.</span><span class="n">StateParam</span><span
class="p">(</span><span class="n">COUNT_STATE</span><span class="p">),</span>
+ <span class="n">expiry_timer</span><span class="o">=</span><span
class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span
class="o">.</span><span class="n">TimerParam</span><span
class="p">(</span><span class="n">EXPIRY_TIMER</span><span class="p">)):</span>
+
+ <span class="n">expiry_timer</span><span class="o">.</span><span
class="nb">set</span><span class="p">(</span><span class="n">w</span><span
class="o">.</span><span class="n">end</span> <span class="o">+</span> <span
class="n">ALLOWED_LATENESS</span><span class="p">)</span>
+
+ <span class="err">…</span> <span class="n">same</span> <span
class="n">logic</span> <span class="k">as</span> <span class="n">above</span>
<span class="err">…</span>
+
+ <span class="nd">@on_timer</span><span class="p">(</span><span
class="n">EXPIRY_TIMER</span><span class="p">)</span>
+ <span class="k">def</span> <span class="nf">expiry</span><span
class="p">(</span><span class="bp">self</span><span class="p">,</span>
+ <span class="n">buffer_state</span><span class="o">=</span><span
class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span
class="o">.</span><span class="n">StateParam</span><span
class="p">(</span><span class="n">BUFFER_STATE</span><span class="p">),</span>
+ <span class="n">count_state</span><span class="o">=</span><span
class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span
class="o">.</span><span class="n">StateParam</span><span
class="p">(</span><span class="n">COUNT_STATE</span><span class="p">)):</span>
+ <span class="n">events</span> <span class="o">=</span> <span
class="n">buffer_state</span><span class="o">.</span><span
class="n">read</span><span class="p">()</span>
+
+ <span class="k">for</span> <span class="n">event</span> <span
class="ow">in</span> <span class="n">events</span><span class="p">:</span>
+ <span class="k">yield</span> <span class="n">event</span>
+
+ <span class="n">buffer_state</span><span class="o">.</span><span
class="n">clear</span><span class="p">()</span>
+ <span class="n">count_state</span><span class="o">.</span><span
class="n">clear</span><span class="p">()</span>
</code></pre>
</div>
@@ -556,7 +608,7 @@ callback will fire and enrich and emit any buffered
elements.</p>
<span class="kt">boolean</span> <span class="n">staleTimerSet</span> <span
class="o">=</span> <span class="n">firstNonNull</span><span
class="o">(</span><span class="n">staleSetState</span><span
class="o">.</span><span class="na">read</span><span class="o">(),</span> <span
class="kc">false</span><span class="o">);</span>
<span class="k">if</span> <span class="o">(</span><span
class="n">firstNonNull</span><span class="o">(</span><span
class="n">countState</span><span class="o">.</span><span
class="na">read</span><span class="o">(),</span> <span class="mi">0</span><span
class="o">)</span> <span class="o">==</span> <span class="mi">0</span><span
class="o">)</span> <span class="o">{</span>
- <span class="n">staleTimer</span><span class="o">.</span><span
class="na">offset</span><span class="o">(</span><span
class="n">MAX_BUFFER_DURATION</span><span class="o">).</span><span
class="na">setRelative</span><span class="o">());</span>
+ <span class="n">staleTimer</span><span class="o">.</span><span
class="na">offset</span><span class="o">(</span><span
class="n">MAX_BUFFER_DURATION</span><span class="o">).</span><span
class="na">setRelative</span><span class="o">();</span>
<span class="o">}</span>
<span class="err">…</span> <span class="n">same</span> <span
class="n">processing</span> <span class="n">logic</span> <span
class="n">as</span> <span class="n">above</span> <span class="err">…</span>
@@ -581,8 +633,38 @@ callback will fire and enrich and emit any buffered
elements.</p>
</code></pre>
</div>
-<div class="language-py highlighter-rouge"><pre class="highlight"><code><span
class="c"># State and timers are not yet supported in Beam's Python SDK.</span>
-<span class="c"># Follow https://issues.apache.org/jira/browse/BEAM-2687 for
updates.</span>
+<div class="language-py highlighter-rouge"><pre class="highlight"><code><span
class="k">class</span> <span class="nc">StatefulBufferingFn</span><span
class="p">(</span><span class="n">beam</span><span class="o">.</span><span
class="n">DoFn</span><span class="p">):</span>
+ <span class="err">…</span>
+
+ <span class="n">STALE_TIMER</span> <span class="o">=</span> <span
class="n">TimerSpec</span><span class="p">(</span><span
class="s">'stale'</span><span class="p">,</span> <span
class="n">TimeDomain</span><span class="o">.</span><span
class="n">REAL_TIME</span><span class="p">)</span>
+
+ <span class="n">MAX_BUFFER_DURATION</span> <span class="o">=</span> <span
class="mi">1</span>
+
+ <span class="k">def</span> <span class="nf">process</span><span
class="p">(</span><span class="bp">self</span><span class="p">,</span> <span
class="n">element</span><span class="p">,</span>
+ <span class="n">w</span><span class="o">=</span><span
class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span
class="o">.</span><span class="n">WindowParam</span><span class="p">,</span>
+ <span class="n">buffer_state</span><span class="o">=</span><span
class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span
class="o">.</span><span class="n">StateParam</span><span
class="p">(</span><span class="n">BUFFER_STATE</span><span class="p">),</span>
+ <span class="n">count_state</span><span class="o">=</span><span
class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span
class="o">.</span><span class="n">StateParam</span><span
class="p">(</span><span class="n">COUNT_STATE</span><span class="p">),</span>
+ <span class="n">expiry_timer</span><span class="o">=</span><span
class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span
class="o">.</span><span class="n">TimerParam</span><span
class="p">(</span><span class="n">EXPIRY_TIMER</span><span class="p">),</span>
+ <span class="n">stale_timer</span><span class="o">=</span><span
class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span
class="o">.</span><span class="n">TimerParam</span><span
class="p">(</span><span class="n">STALE_TIMER</span><span class="p">)):</span>
+
+ <span class="k">if</span> <span class="n">count_state</span><span
class="o">.</span><span class="n">read</span><span class="p">()</span> <span
class="o">==</span> <span class="mi">0</span><span class="p">:</span>
+ <span class="c"># We set an absolute timestamp here (not an offset like
in the Java SDK)</span>
+ <span class="n">stale_timer</span><span class="o">.</span><span
class="nb">set</span><span class="p">(</span><span class="n">time</span><span
class="o">.</span><span class="n">time</span><span class="p">()</span> <span
class="o">+</span> <span class="n">StatefulBufferingFn</span><span
class="o">.</span><span class="n">MAX_BUFFER_DURATION</span><span
class="p">)</span>
+
+ <span class="err">…</span> <span class="n">same</span> <span
class="n">logic</span> <span class="k">as</span> <span class="n">above</span>
<span class="err">…</span>
+
+ <span class="nd">@on_timer</span><span class="p">(</span><span
class="n">STALE_TIMER</span><span class="p">)</span>
+ <span class="k">def</span> <span class="nf">stale</span><span
class="p">(</span><span class="bp">self</span><span class="p">,</span>
+ <span class="n">buffer_state</span><span class="o">=</span><span
class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span
class="o">.</span><span class="n">StateParam</span><span
class="p">(</span><span class="n">BUFFER_STATE</span><span class="p">),</span>
+ <span class="n">count_state</span><span class="o">=</span><span
class="n">beam</span><span class="o">.</span><span class="n">DoFn</span><span
class="o">.</span><span class="n">StateParam</span><span
class="p">(</span><span class="n">COUNT_STATE</span><span class="p">)):</span>
+ <span class="n">events</span> <span class="o">=</span> <span
class="n">buffer_state</span><span class="o">.</span><span
class="n">read</span><span class="p">()</span>
+
+ <span class="k">for</span> <span class="n">event</span> <span
class="ow">in</span> <span class="n">events</span><span class="p">:</span>
+ <span class="k">yield</span> <span class="n">event</span>
+
+ <span class="n">buffer_state</span><span class="o">.</span><span
class="n">clear</span><span class="p">()</span>
+ <span class="n">count_state</span><span class="o">.</span><span
class="n">clear</span><span class="p">()</span>
+
</code></pre>
</div>