This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0b417b8 MINOR: updates docs for KIP-358 (#5796)
0b417b8 is described below
commit 0b417b8331e44c64011ea0b48ac66d5fa8005e0f
Author: Matthias J. Sax <[email protected]>
AuthorDate: Mon Oct 15 17:22:03 2018 -0700
MINOR: updates docs for KIP-358 (#5796)
Reviewers: Guozhang Wang <[email protected]>, Jim Galasyn
<[email protected]>
---
docs/streams/developer-guide/dsl-api.html | 64 +++++++++++-----------
.../developer-guide/interactive-queries.html | 10 ++--
docs/streams/developer-guide/processor-api.html | 14 ++---
docs/streams/upgrade-guide.html | 23 +++++++-
.../org/apache/kafka/streams/kstream/Windows.java | 3 +-
5 files changed, 68 insertions(+), 46 deletions(-)
diff --git a/docs/streams/developer-guide/dsl-api.html
b/docs/streams/developer-guide/dsl-api.html
index 1622702..86f6c78 100644
--- a/docs/streams/developer-guide/dsl-api.html
+++ b/docs/streams/developer-guide/dsl-api.html
@@ -1021,13 +1021,13 @@
<p>The windowed <code class="docutils
literal"><span class="pre">aggregate</span></code> turns a <code
class="docutils literal"><span class="pre">TimeWindowedKStream<K,</span>
<span class="pre">V></span></code> or <code class="docutils literal"><span
class="pre">SessionWindowdKStream<K,</span> <span
class="pre">V></span></code>
into a windowed <code class="docutils
literal"><span class="pre">KTable<Windowed<K>,</span> <span
class="pre">V></span></code>.</p>
<p>Several variants of <code class="docutils
literal"><span class="pre">aggregate</span></code> exist, see Javadocs for
details.</p>
- <div class="highlight-java"><div
class="highlight"><pre><span></span><span class="kn">import</span> <span
class="nn">java.util.concurrent.TimeUnit</span><span class="o">;</span>
+ <div class="highlight-java"><div
class="highlight"><pre><span></span><span class="kn">import</span> <span
class="nn">java.time.Duration</span><span class="o">;</span>
<span class="n">KGroupedStream</span><span class="o"><</span><span
class="n">String</span><span class="o">,</span> <span
class="n">Long</span><span class="o">></span> <span
class="n">groupedStream</span> <span class="o">=</span> <span
class="o">...;</span>
<span class="c1">// Java 8+ examples, using lambda expressions</span>
<span class="c1">// Aggregating with time-based windowing (here: with 5-minute
tumbling windows)</span>
-<span class="n">KTable</span><span class="o"><</span><span
class="n">Windowed</span><span class="o"><</span><span
class="n">String</span><span class="o">>,</span> <span
class="n">Long</span><span class="o">></span> <span
class="n">timeWindowedAggregatedStream</span> <span class="o">=</span> <span
class="n">groupedStream</span><span class="o">.</span><span
class="na">windowedBy</span><span class="o">(</span><span
class="n">TimeUnit</span><span class="o">.</span><span class="na [...]
+<span class="n">KTable</span><span class="o"><</span><span
class="n">Windowed</span><span class="o"><</span><span
class="n">String</span><span class="o">>,</span> <span
class="n">Long</span><span class="o">></span> <span
class="n">timeWindowedAggregatedStream</span> <span class="o">=</span> <span
class="n">groupedStream</span><span class="o">.</span><span
class="na">windowedBy</span><span class="o">(</span><span
class="n">Duration</span><span class="o">.</span><span class="na [...]
<span class="o">.</span><span class="na">aggregate</span><span
class="o">(</span>
<span class="o">()</span> <span class="o">-></span> <span
class="mi">0</span><span class="n">L</span><span class="o">,</span> <span
class="cm">/* initializer */</span>
<span class="o">(</span><span class="n">aggKey</span><span
class="o">,</span> <span class="n">newValue</span><span class="o">,</span>
<span class="n">aggValue</span><span class="o">)</span> <span
class="o">-></span> <span class="n">aggValue</span> <span class="o">+</span>
<span class="n">newValue</span><span class="o">,</span> <span class="cm">/*
adder */</span>
@@ -1035,7 +1035,7 @@
<span class="o">.</span><span class="na">withValueSerde</span><span
class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span
class="na">Long</span><span class="o">()));</span> <span class="cm">/* serde
for aggregate value */</span>
<span class="c1">// Aggregating with session-based windowing (here: with an
inactivity gap of 5 minutes)</span>
-<span class="n">KTable</span><span class="o"><</span><span
class="n">Windowed</span><span class="o"><</span><span
class="n">String</span><span class="o">>,</span> <span
class="n">Long</span><span class="o">></span> <span
class="n">sessionizedAggregatedStream</span> <span class="o">=</span> <span
class="n">groupedStream</span><span class="o">.</span><span
class="na">windowedBy</span><span class="o">(</span><span
class="n">SessionWindows</span><span class="o">.</span><span clas [...]
+<span class="n">KTable</span><span class="o"><</span><span
class="n">Windowed</span><span class="o"><</span><span
class="n">String</span><span class="o">>,</span> <span
class="n">Long</span><span class="o">></span> <span
class="n">sessionizedAggregatedStream</span> <span class="o">=</span> <span
class="n">groupedStream</span><span class="o">.</span><span
class="na">windowedBy</span><span class="o">(</span><span
class="n">SessionWindows</span><span class="o">.</span><span clas [...]
<span class="n">aggregate</span><span class="o">(</span>
<span class="o">()</span> <span class="o">-></span> <span
class="mi">0</span><span class="n">L</span><span class="o">,</span> <span
class="cm">/* initializer */</span>
<span class="o">(</span><span class="n">aggKey</span><span
class="o">,</span> <span class="n">newValue</span><span class="o">,</span>
<span class="n">aggValue</span><span class="o">)</span> <span
class="o">-></span> <span class="n">aggValue</span> <span class="o">+</span>
<span class="n">newValue</span><span class="o">,</span> <span class="cm">/*
adder */</span>
@@ -1046,7 +1046,7 @@
<span class="c1">// Java 7 examples</span>
<span class="c1">// Aggregating with time-based windowing (here: with 5-minute
tumbling windows)</span>
-<span class="n">KTable</span><span class="o"><</span><span
class="n">Windowed</span><span class="o"><</span><span
class="n">String</span><span class="o">>,</span> <span
class="n">Long</span><span class="o">></span> <span
class="n">timeWindowedAggregatedStream</span> <span class="o">=</span> <span
class="n">groupedStream</span><span class="o">.</span><span
class="na">windowedBy</span><span class="o">(</span><span
class="n">TimeUnit</span><span class="o">.</span><span class="na [...]
+<span class="n">KTable</span><span class="o"><</span><span
class="n">Windowed</span><span class="o"><</span><span
class="n">String</span><span class="o">>,</span> <span
class="n">Long</span><span class="o">></span> <span
class="n">timeWindowedAggregatedStream</span> <span class="o">=</span> <span
class="n">groupedStream</span><span class="o">.</span><span
class="na">windowedBy</span><span class="o">(</span><span
class="n">Duration</span><span class="o">.</span><span class="na [...]
<span class="o">.</span><span class="na">aggregate</span><span
class="o">(</span>
<span class="k">new</span> <span class="n">Initializer</span><span
class="o"><</span><span class="n">Long</span><span class="o">>()</span>
<span class="o">{</span> <span class="cm">/* initializer */</span>
<span class="nd">@Override</span>
@@ -1064,7 +1064,7 @@
<span class="o">.</span><span class="na">withValueSerde</span><span
class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span
class="na">Long</span><span class="o">()));</span>
<span class="c1">// Aggregating with session-based windowing (here: with an
inactivity gap of 5 minutes)</span>
-<span class="n">KTable</span><span class="o"><</span><span
class="n">Windowed</span><span class="o"><</span><span
class="n">String</span><span class="o">>,</span> <span
class="n">Long</span><span class="o">></span> <span
class="n">sessionizedAggregatedStream</span> <span class="o">=</span> <span
class="n">groupedStream</span><span class="o">.</span><span
class="na">windowedBy</span><span class="o">(</span><span
class="n">SessionWindows</span><span class="o">.</span><span clas [...]
+<span class="n">KTable</span><span class="o"><</span><span
class="n">Windowed</span><span class="o"><</span><span
class="n">String</span><span class="o">>,</span> <span
class="n">Long</span><span class="o">></span> <span
class="n">sessionizedAggregatedStream</span> <span class="o">=</span> <span
class="n">groupedStream</span><span class="o">.</span><span
class="na">windowedBy</span><span class="o">(</span><span
class="n">SessionWindows</span><span class="o">.</span><span clas [...]
<span class="n">aggregate</span><span class="o">(</span>
<span class="k">new</span> <span class="n">Initializer</span><span
class="o"><</span><span class="n">Long</span><span class="o">>()</span>
<span class="o">{</span> <span class="cm">/* initializer */</span>
<span class="nd">@Override</span>
@@ -1144,17 +1144,17 @@
<p>The windowed <code class="docutils
literal"><span class="pre">count</span></code> turns a <code class="docutils
literal"><span class="pre">TimeWindowedKStream<K,</span> <span
class="pre">V></span></code> or <code class="docutils literal"><span
class="pre">SessionWindowedKStream<K,</span> <span
class="pre">V></span></code>
into a windowed <code class="docutils
literal"><span class="pre">KTable<Windowed<K>,</span> <span
class="pre">V></span></code>.</p>
<p>Several variants of <code class="docutils
literal"><span class="pre">count</span></code> exist, see Javadocs for
details.</p>
- <div class="highlight-java"><div
class="highlight"><pre><span></span><span class="kn">import</span> <span
class="nn">java.util.concurrent.TimeUnit</span><span class="o">;</span>
+ <div class="highlight-java"><div
class="highlight"><pre><span></span><span class="kn">import</span> <span
class="nn">java.time.Duration</span><span class="o">;</span>
<span class="n">KGroupedStream</span><span class="o"><</span><span
class="n">String</span><span class="o">,</span> <span
class="n">Long</span><span class="o">></span> <span
class="n">groupedStream</span> <span class="o">=</span> <span
class="o">...;</span>
<span class="c1">// Counting a KGroupedStream with time-based windowing (here:
with 5-minute tumbling windows)</span>
<span class="n">KTable</span><span class="o"><</span><span
class="n">Windowed</span><span class="o"><</span><span
class="n">String</span><span class="o">>,</span> <span
class="n">Long</span><span class="o">></span> <span
class="n">aggregatedStream</span> <span class="o">=</span> <span
class="n">groupedStream</span><span class="o">.</span><span
class="na">windowedBy</span><span class="o">(</span>
- <span class="n">TimeWindows</span><span class="o">.</span><span
class="na">of</span><span class="o">(</span><span
class="n">TimeUnit</span><span class="o">.</span><span
class="na">MINUTES</span><span class="o">.</span><span
class="na">toMillis</span><span class="o">(</span><span
class="mi">5</span><span class="o">)))</span> <span class="cm">/* time-based
window */</span>
+ <span class="n">TimeWindows</span><span class="o">.</span><span
class="na">of</span><span class="o">(</span><span
class="n">Duration</span><span class="o">.</span><span
class="na">ofMinutes</span><span class="o">(</span><span
class="mi">5</span><span class="o">)))</span> <span class="cm">/* time-based
window */</span>
<span class="o">.</span><span class="na">count</span><span
class="o">();</span>
<span class="c1">// Counting a KGroupedStream with session-based windowing
(here: with 5-minute inactivity gaps)</span>
<span class="n">KTable</span><span class="o"><</span><span
class="n">Windowed</span><span class="o"><</span><span
class="n">String</span><span class="o">>,</span> <span
class="n">Long</span><span class="o">></span> <span
class="n">aggregatedStream</span> <span class="o">=</span> <span
class="n">groupedStream</span><span class="o">.</span><span
class="na">windowedBy</span><span class="o">(</span>
- <span class="n">SessionWindows</span><span class="o">.</span><span
class="na">with</span><span class="o">(</span><span
class="n">TimeUnit</span><span class="o">.</span><span
class="na">MINUTES</span><span class="o">.</span><span
class="na">toMillis</span><span class="o">(</span><span
class="mi">5</span><span class="o">)))</span> <span class="cm">/* session
window */</span>
+ <span class="n">SessionWindows</span><span class="o">.</span><span
class="na">with</span><span class="o">(</span><span
class="n">Duration</span><span class="o">.</span><span
class="na">ofMinutes</span><span class="o">(</span><span
class="mi">5</span><span class="o">)))</span> <span class="cm">/* session
window */</span>
<span class="o">.</span><span class="na">count</span><span
class="o">();</span>
</pre></div>
</div>
@@ -1262,21 +1262,21 @@
<p>The windowed <code class="docutils
literal"><span class="pre">reduce</span></code> turns a turns a <code
class="docutils literal"><span class="pre">TimeWindowedKStream<K,</span>
<span class="pre">V></span></code> or a <code class="docutils literal"><span
class="pre">SessionWindowedKStream<K,</span> <span
class="pre">V></span></code>
into a windowed <code class="docutils
literal"><span class="pre">KTable<Windowed<K>,</span> <span
class="pre">V></span></code>.</p>
<p>Several variants of <code class="docutils
literal"><span class="pre">reduce</span></code> exist, see Javadocs for
details.</p>
- <div class="highlight-java"><div
class="highlight"><pre><span></span><span class="kn">import</span> <span
class="nn">java.util.concurrent.TimeUnit</span><span class="o">;</span>
+ <div class="highlight-java"><div
class="highlight"><pre><span></span><span class="kn">import</span> <span
class="nn">java.time.Duration</span><span class="o">;</span>
<span class="n">KGroupedStream</span><span class="o"><</span><span
class="n">String</span><span class="o">,</span> <span
class="n">Long</span><span class="o">></span> <span
class="n">groupedStream</span> <span class="o">=</span> <span
class="o">...;</span>
<span class="c1">// Java 8+ examples, using lambda expressions</span>
<span class="c1">// Aggregating with time-based windowing (here: with 5-minute
tumbling windows)</span>
<span class="n">KTable</span><span class="o"><</span><span
class="n">Windowed</span><span class="o"><</span><span
class="n">String</span><span class="o">>,</span> <span
class="n">Long</span><span class="o">></span> <span
class="n">timeWindowedAggregatedStream</span> <span class="o">=</span> <span
class="n">groupedStream</span><span class="o">.</span><span
class="na">windowedBy</span><span class="o">(</span>
- <span class="n">TimeWindows</span><span class="o">.</span><span
class="na">of</span><span class="o">(</span><span
class="n">TimeUnit</span><span class="o">.</span><span
class="na">MINUTES</span><span class="o">.</span><span
class="na">toMillis</span><span class="o">(</span><span
class="mi">5</span><span class="o">))</span> <span class="cm">/* time-based
window */</span><span class="o">)</span>
+ <span class="n">TimeWindows</span><span class="o">.</span><span
class="na">of</span><span class="o">(</span><span
class="n">Duration</span><span class="o">.</span><span
class="na">ofMinutes</span><span class="o">(</span><span
class="mi">5</span><span class="o">))</span> <span class="cm">/* time-based
window */</span><span class="o">)</span>
<span class="o">.</span><span class="na">reduce</span><span
class="o">(</span>
<span class="o">(</span><span class="n">aggValue</span><span
class="o">,</span> <span class="n">newValue</span><span class="o">)</span>
<span class="o">-></span> <span class="n">aggValue</span> <span
class="o">+</span> <span class="n">newValue</span> <span class="cm">/* adder
*/</span>
<span class="o">);</span>
<span class="c1">// Aggregating with session-based windowing (here: with an
inactivity gap of 5 minutes)</span>
<span class="n">KTable</span><span class="o"><</span><span
class="n">Windowed</span><span class="o"><</span><span
class="n">String</span><span class="o">>,</span> <span
class="n">Long</span><span class="o">></span> <span
class="n">sessionzedAggregatedStream</span> <span class="o">=</span> <span
class="n">groupedStream</span><span class="o">.</span><span
class="na">windowedBy</span><span class="o">(</span>
- <span class="n">SessionWindows</span><span class="o">.</span><span
class="na">with</span><span class="o">(</span><span
class="n">TimeUnit</span><span class="o">.</span><span
class="na">MINUTES</span><span class="o">.</span><span
class="na">toMillis</span><span class="o">(</span><span
class="mi">5</span><span class="o">)))</span> <span class="cm">/* session
window */</span>
+ <span class="n">SessionWindows</span><span class="o">.</span><span
class="na">with</span><span class="o">(</span><span
class="n">Duration</span><span class="o">.</span><span
class="na">ofMinutes</span><span class="o">(</span><span
class="mi">5</span><span class="o">)))</span> <span class="cm">/* session
window */</span>
<span class="o">.</span><span class="na">reduce</span><span
class="o">(</span>
<span class="o">(</span><span class="n">aggValue</span><span
class="o">,</span> <span class="n">newValue</span><span class="o">)</span>
<span class="o">-></span> <span class="n">aggValue</span> <span
class="o">+</span> <span class="n">newValue</span> <span class="cm">/* adder
*/</span>
<span class="o">);</span>
@@ -1286,7 +1286,7 @@
<span class="c1">// Aggregating with time-based windowing (here: with 5-minute
tumbling windows)</span>
<span class="n">KTable</span><span class="o"><</span><span
class="n">Windowed</span><span class="o"><</span><span
class="n">String</span><span class="o">>,</span> <span
class="n">Long</span><span class="o">></span> <span
class="n">timeWindowedAggregatedStream</span> <span class="o">=</span> <span
class="n">groupedStream</span><span class="o">..</span><span
class="na">windowedBy</span><span class="o">(</span>
- <span class="n">TimeWindows</span><span class="o">.</span><span
class="na">of</span><span class="o">(</span><span
class="n">TimeUnit</span><span class="o">.</span><span
class="na">MINUTES</span><span class="o">.</span><span
class="na">toMillis</span><span class="o">(</span><span
class="mi">5</span><span class="o">))</span> <span class="cm">/* time-based
window */</span><span class="o">)</span>
+ <span class="n">TimeWindows</span><span class="o">.</span><span
class="na">of</span><span class="o">(</span><span
class="n">Duration</span><span class="o">.</span><span
class="na">ofMinutes</span><span class="o">(</span><span
class="mi">5</span><span class="o">))</span> <span class="cm">/* time-based
window */</span><span class="o">)</span>
<span class="o">.</span><span class="na">reduce</span><span
class="o">(</span>
<span class="k">new</span> <span class="n">Reducer</span><span
class="o"><</span><span class="n">Long</span><span class="o">>()</span>
<span class="o">{</span> <span class="cm">/* adder */</span>
<span class="nd">@Override</span>
@@ -1297,7 +1297,7 @@
<span class="c1">// Aggregating with session-based windowing (here: with an
inactivity gap of 5 minutes)</span>
<span class="n">KTable</span><span class="o"><</span><span
class="n">Windowed</span><span class="o"><</span><span
class="n">String</span><span class="o">>,</span> <span
class="n">Long</span><span class="o">></span> <span
class="n">timeWindowedAggregatedStream</span> <span class="o">=</span> <span
class="n">groupedStream</span><span class="o">.</span><span
class="na">windowedBy</span><span class="o">(</span>
- <span class="n">SessionWindows</span><span class="o">.</span><span
class="na">with</span><span class="o">(</span><span
class="n">TimeUnit</span><span class="o">.</span><span
class="na">MINUTES</span><span class="o">.</span><span
class="na">toMillis</span><span class="o">(</span><span
class="mi">5</span><span class="o">)))</span> <span class="cm">/* session
window */</span>
+ <span class="n">SessionWindows</span><span class="o">.</span><span
class="na">with</span><span class="o">(</span><span
class="n">Duration</span><span class="o">.</span><span
class="na">ofMinutes</span><span class="o">(</span><span
class="mi">5</span><span class="o">)))</span> <span class="cm">/* session
window */</span>
<span class="o">.</span><span class="na">reduce</span><span
class="o">(</span>
<span class="k">new</span> <span class="n">Reducer</span><span
class="o"><</span><span class="n">Long</span><span class="o">>()</span>
<span class="o">{</span> <span class="cm">/* adder */</span>
<span class="nd">@Override</span>
@@ -1761,14 +1761,14 @@
<p><strong>Data must be
co-partitioned</strong>: The input data for both sides must be <a
class="reference internal"
href="#streams-developer-guide-dsl-joins-co-partitioning"><span class="std
std-ref">co-partitioned</span></a>.</p>
<p><strong>Causes data re-partitioning of
a stream if and only if the stream was marked for re-partitioning (if both are
marked, both are re-partitioned).</strong></p>
<p>Several variants of <code
class="docutils literal"><span class="pre">join</span></code> exists, see the
Javadocs for details.</p>
- <div class="highlight-java"><div
class="highlight"><pre><span></span><span class="kn">import</span> <span
class="nn">java.util.concurrent.TimeUnit</span><span class="o">;</span>
+ <div class="highlight-java"><div
class="highlight"><pre><span></span><span class="kn">import</span> <span
class="nn">java.time.Duration</span><span class="o">;</span>
<span class="n">KStream</span><span class="o"><</span><span
class="n">String</span><span class="o">,</span> <span
class="n">Long</span><span class="o">></span> <span class="n">left</span>
<span class="o">=</span> <span class="o">...;</span>
<span class="n">KStream</span><span class="o"><</span><span
class="n">String</span><span class="o">,</span> <span
class="n">Double</span><span class="o">></span> <span class="n">right</span>
<span class="o">=</span> <span class="o">...;</span>
<span class="c1">// Java 8+ example, using lambda expressions</span>
<span class="n">KStream</span><span class="o"><</span><span
class="n">String</span><span class="o">,</span> <span
class="n">String</span><span class="o">></span> <span
class="n">joined</span> <span class="o">=</span> <span
class="n">left</span><span class="o">.</span><span class="na">join</span><span
class="o">(</span><span class="n">right</span><span class="o">,</span>
<span class="o">(</span><span class="n">leftValue</span><span
class="o">,</span> <span class="n">rightValue</span><span class="o">)</span>
<span class="o">-></span> <span class="s">"left="</span> <span
class="o">+</span> <span class="n">leftValue</span> <span class="o">+</span>
<span class="s">", right="</span> <span class="o">+</span> <span
class="n">rightValue</span><span class="o">,</span> <span class="cm">/*
ValueJoiner */</span>
- <span class="n">JoinWindows</span><span class="o">.</span><span
class="na">of</span><span class="o">(</span><span
class="n">TimeUnit</span><span class="o">.</span><span
class="na">MINUTES</span><span class="o">.</span><span
class="na">toMillis</span><span class="o">(</span><span
class="mi">5</span><span class="o">)),</span>
+ <span class="n">JoinWindows</span><span class="o">.</span><span
class="na">of</span><span class="o">(</span><span
class="n">Duration</span><span class="o">.</span><span
class="na">ofMinutes</span><span class="o">(</span><span
class="mi">5</span><span class="o">)),</span>
<span class="n">Joined</span><span class="o">.</span><span
class="na">with</span><span class="o">(</span>
<span class="n">Serdes</span><span class="o">.</span><span
class="na">String</span><span class="o">(),</span> <span class="cm">/* key
*/</span>
<span class="n">Serdes</span><span class="o">.</span><span
class="na">Long</span><span class="o">(),</span> <span class="cm">/* left
value */</span>
@@ -1783,7 +1783,7 @@
<span class="k">return</span> <span class="s">"left="</span>
<span class="o">+</span> <span class="n">leftValue</span> <span
class="o">+</span> <span class="s">", right="</span> <span
class="o">+</span> <span class="n">rightValue</span><span class="o">;</span>
<span class="o">}</span>
<span class="o">},</span>
- <span class="n">JoinWindows</span><span class="o">.</span><span
class="na">of</span><span class="o">(</span><span
class="n">TimeUnit</span><span class="o">.</span><span
class="na">MINUTES</span><span class="o">.</span><span
class="na">toMillis</span><span class="o">(</span><span
class="mi">5</span><span class="o">)),</span>
+ <span class="n">JoinWindows</span><span class="o">.</span><span
class="na">of</span><span class="o">(</span><span
class="n">Duration</span><span class="o">.</span><span
class="na">ofMinutes</span><span class="o">(</span><span
class="mi">5</span><span class="o">)),</span>
<span class="n">Joined</span><span class="o">.</span><span
class="na">with</span><span class="o">(</span>
<span class="n">Serdes</span><span class="o">.</span><span
class="na">String</span><span class="o">(),</span> <span class="cm">/* key
*/</span>
<span class="n">Serdes</span><span class="o">.</span><span
class="na">Long</span><span class="o">(),</span> <span class="cm">/* left
value */</span>
@@ -1820,14 +1820,14 @@
<p><strong>Data must be
co-partitioned</strong>: The input data for both sides must be <a
class="reference internal"
href="#streams-developer-guide-dsl-joins-co-partitioning"><span class="std
std-ref">co-partitioned</span></a>.</p>
<p><strong>Causes data re-partitioning of
a stream if and only if the stream was marked for re-partitioning (if both are
marked, both are re-partitioned).</strong></p>
<p>Several variants of <code
class="docutils literal"><span class="pre">leftJoin</span></code> exists, see
the Javadocs for details.</p>
- <div class="highlight-java"><div
class="highlight"><pre><span></span><span class="kn">import</span> <span
class="nn">java.util.concurrent.TimeUnit</span><span class="o">;</span>
+ <div class="highlight-java"><div
class="highlight"><pre><span></span><span class="kn">import</span> <span
class="nn">java.time.Duration</span><span class="o">;</span>
<span class="n">KStream</span><span class="o"><</span><span
class="n">String</span><span class="o">,</span> <span
class="n">Long</span><span class="o">></span> <span class="n">left</span>
<span class="o">=</span> <span class="o">...;</span>
<span class="n">KStream</span><span class="o"><</span><span
class="n">String</span><span class="o">,</span> <span
class="n">Double</span><span class="o">></span> <span class="n">right</span>
<span class="o">=</span> <span class="o">...;</span>
<span class="c1">// Java 8+ example, using lambda expressions</span>
<span class="n">KStream</span><span class="o"><</span><span
class="n">String</span><span class="o">,</span> <span
class="n">String</span><span class="o">></span> <span
class="n">joined</span> <span class="o">=</span> <span
class="n">left</span><span class="o">.</span><span
class="na">leftJoin</span><span class="o">(</span><span
class="n">right</span><span class="o">,</span>
<span class="o">(</span><span class="n">leftValue</span><span
class="o">,</span> <span class="n">rightValue</span><span class="o">)</span>
<span class="o">-></span> <span class="s">"left="</span> <span
class="o">+</span> <span class="n">leftValue</span> <span class="o">+</span>
<span class="s">", right="</span> <span class="o">+</span> <span
class="n">rightValue</span><span class="o">,</span> <span class="cm">/*
ValueJoiner */</span>
- <span class="n">JoinWindows</span><span class="o">.</span><span
class="na">of</span><span class="o">(</span><span
class="n">TimeUnit</span><span class="o">.</span><span
class="na">MINUTES</span><span class="o">.</span><span
class="na">toMillis</span><span class="o">(</span><span
class="mi">5</span><span class="o">)),</span>
+ <span class="n">JoinWindows</span><span class="o">.</span><span
class="na">of</span><span class="o">(</span><span
class="n">Duration</span><span class="o">.</span><span
class="na">ofMinutes</span><span class="o">(</span><span
class="mi">5</span><span class="o">)),</span>
<span class="n">Joined</span><span class="o">.</span><span
class="na">with</span><span class="o">(</span>
<span class="n">Serdes</span><span class="o">.</span><span
class="na">String</span><span class="o">(),</span> <span class="cm">/* key
*/</span>
<span class="n">Serdes</span><span class="o">.</span><span
class="na">Long</span><span class="o">(),</span> <span class="cm">/* left
value */</span>
@@ -1842,7 +1842,7 @@
<span class="k">return</span> <span class="s">"left="</span>
<span class="o">+</span> <span class="n">leftValue</span> <span
class="o">+</span> <span class="s">", right="</span> <span
class="o">+</span> <span class="n">rightValue</span><span class="o">;</span>
<span class="o">}</span>
<span class="o">},</span>
- <span class="n">JoinWindows</span><span class="o">.</span><span
class="na">of</span><span class="o">(</span><span
class="n">TimeUnit</span><span class="o">.</span><span
class="na">MINUTES</span><span class="o">.</span><span
class="na">toMillis</span><span class="o">(</span><span
class="mi">5</span><span class="o">)),</span>
+ <span class="n">JoinWindows</span><span class="o">.</span><span
class="na">of</span><span class="o">(</span><span
class="n">Duration</span><span class="o">.</span><span
class="na">ofMinutes</span><span class="o">(</span><span
class="mi">5</span><span class="o">)),</span>
<span class="n">Joined</span><span class="o">.</span><span
class="na">with</span><span class="o">(</span>
<span class="n">Serdes</span><span class="o">.</span><span
class="na">String</span><span class="o">(),</span> <span class="cm">/* key
*/</span>
<span class="n">Serdes</span><span class="o">.</span><span
class="na">Long</span><span class="o">(),</span> <span class="cm">/* left
value */</span>
@@ -1882,14 +1882,14 @@
<p><strong>Data must be
co-partitioned</strong>: The input data for both sides must be <a
class="reference internal"
href="#streams-developer-guide-dsl-joins-co-partitioning"><span class="std
std-ref">co-partitioned</span></a>.</p>
<p><strong>Causes data re-partitioning of
a stream if and only if the stream was marked for re-partitioning (if both are
marked, both are re-partitioned).</strong></p>
<p>Several variants of <code
class="docutils literal"><span class="pre">outerJoin</span></code> exists, see
the Javadocs for details.</p>
- <div class="highlight-java"><div
class="highlight"><pre><span></span><span class="kn">import</span> <span
class="nn">java.util.concurrent.TimeUnit</span><span class="o">;</span>
+ <div class="highlight-java"><div
class="highlight"><pre><span></span><span class="kn">import</span> <span
class="nn">java.time.Duration</span><span class="o">;</span>
<span class="n">KStream</span><span class="o"><</span><span
class="n">String</span><span class="o">,</span> <span
class="n">Long</span><span class="o">></span> <span class="n">left</span>
<span class="o">=</span> <span class="o">...;</span>
<span class="n">KStream</span><span class="o"><</span><span
class="n">String</span><span class="o">,</span> <span
class="n">Double</span><span class="o">></span> <span class="n">right</span>
<span class="o">=</span> <span class="o">...;</span>
<span class="c1">// Java 8+ example, using lambda expressions</span>
<span class="n">KStream</span><span class="o"><</span><span
class="n">String</span><span class="o">,</span> <span
class="n">String</span><span class="o">></span> <span
class="n">joined</span> <span class="o">=</span> <span
class="n">left</span><span class="o">.</span><span
class="na">outerJoin</span><span class="o">(</span><span
class="n">right</span><span class="o">,</span>
<span class="o">(</span><span class="n">leftValue</span><span
class="o">,</span> <span class="n">rightValue</span><span class="o">)</span>
<span class="o">-></span> <span class="s">"left="</span> <span
class="o">+</span> <span class="n">leftValue</span> <span class="o">+</span>
<span class="s">", right="</span> <span class="o">+</span> <span
class="n">rightValue</span><span class="o">,</span> <span class="cm">/*
ValueJoiner */</span>
- <span class="n">JoinWindows</span><span class="o">.</span><span
class="na">of</span><span class="o">(</span><span
class="n">TimeUnit</span><span class="o">.</span><span
class="na">MINUTES</span><span class="o">.</span><span
class="na">toMillis</span><span class="o">(</span><span
class="mi">5</span><span class="o">)),</span>
+ <span class="n">JoinWindows</span><span class="o">.</span><span
class="na">of</span><span class="o">(</span><span
class="n">Duration</span><span class="o">.</span><span
class="na">ofMinutes</span><span class="o">(</span><span
class="mi">5</span><span class="o">)),</span>
<span class="n">Joined</span><span class="o">.</span><span
class="na">with</span><span class="o">(</span>
<span class="n">Serdes</span><span class="o">.</span><span
class="na">String</span><span class="o">(),</span> <span class="cm">/* key
*/</span>
<span class="n">Serdes</span><span class="o">.</span><span
class="na">Long</span><span class="o">(),</span> <span class="cm">/* left
value */</span>
@@ -1904,7 +1904,7 @@
<span class="k">return</span> <span class="s">"left="</span>
<span class="o">+</span> <span class="n">leftValue</span> <span
class="o">+</span> <span class="s">", right="</span> <span
class="o">+</span> <span class="n">rightValue</span><span class="o">;</span>
<span class="o">}</span>
<span class="o">},</span>
- <span class="n">JoinWindows</span><span class="o">.</span><span
class="na">of</span><span class="o">(</span><span
class="n">TimeUnit</span><span class="o">.</span><span
class="na">MINUTES</span><span class="o">.</span><span
class="na">toMillis</span><span class="o">(</span><span
class="mi">5</span><span class="o">)),</span>
+ <span class="n">JoinWindows</span><span class="o">.</span><span
class="na">of</span><span class="o">(</span><span
class="n">Duration</span><span class="o">.</span><span
class="na">ofMinutes</span><span class="o">(</span><span
class="mi">5</span><span class="o">)),</span>
<span class="n">Joined</span><span class="o">.</span><span
class="na">with</span><span class="o">(</span>
<span class="n">Serdes</span><span class="o">.</span><span
class="na">String</span><span class="o">(),</span> <span class="cm">/* key
*/</span>
<span class="n">Serdes</span><span class="o">.</span><span
class="na">Long</span><span class="o">(),</span> <span class="cm">/* left
value */</span>
@@ -2804,7 +2804,7 @@
Old records in the state store are purged after the
specified
<a class="reference internal"
href="../core-concepts.html#streams_concepts_windowing"><span class="std
std-ref">window retention period</span></a>.
Kafka Streams guarantees to keep a window for at least
this specified time; the default value is one day and can be
- changed via <code class="docutils literal"><span
class="pre">Windows#until()</span></code> and <code class="docutils
literal"><span class="pre">SessionWindows#until()</span></code>.</p>
+ changed via <code class="docutils literal"><span
class="pre">Materialized#withRetention()</span></code>.</p>
<p>The DSL supports the following types of windows:</p>
<table border="1" class="docutils">
<colgroup>
@@ -2857,12 +2857,12 @@ become t=300,000).</span></p>
windows with a size of 5000ms have predictable
window boundaries <code class="docutils literal"><span
class="pre">[0;5000),[5000;10000),...</span></code> — and
<strong>not</strong>
<code class="docutils literal"><span
class="pre">[1000;6000),[6000;11000),...</span></code> or even something
“random” like <code class="docutils literal"><span
class="pre">[1452;6452),[6452;11452),...</span></code>.</p>
<p>The following code defines a tumbling window with a
size of 5 minutes:</p>
- <div class="highlight-java"><div
class="highlight"><pre><span></span><span class="kn">import</span> <span
class="nn">java.util.concurrent.TimeUnit</span><span class="o">;</span>
+ <div class="highlight-java"><div
class="highlight"><pre><span></span><span class="kn">import</span> <span
class="nn">java.time.Duration</span><span class="o">;</span>
<span class="kn">import</span> <span
class="nn">org.apache.kafka.streams.kstream.TimeWindows</span><span
class="o">;</span>
<span class="c1">// A tumbling time window with a size of 5 minutes (and, by
definition, an implicit</span>
<span class="c1">// advance interval of 5 minutes).</span>
-<span class="kt">long</span> <span class="n">windowSizeMs</span> <span
class="o">=</span> <span class="n">TimeUnit</span><span class="o">.</span><span
class="na">MINUTES</span><span class="o">.</span><span
class="na">toMillis</span><span class="o">(</span><span
class="mi">5</span><span class="o">);</span> <span class="c1">// 5 * 60 *
1000L</span>
+<span class="kt">Duration</span> <span class="n">windowSizeMs</span> <span
class="o">=</span> <span class="n">Duration</span><span class="o">.</span><span
class="na">ofMinutes</span><span class="o">(</span><span
class="mi">5</span><span class="o">);</span>
<span class="n">TimeWindows</span><span class="o">.</span><span
class="na">of</span><span class="o">(</span><span
class="n">windowSizeMs</span><span class="o">);</span>
<span class="c1">// The above is equivalent to the following code:</span>
@@ -2884,13 +2884,13 @@ become t=300,000).</span></p>
terminology in academic literature, where the
semantics of sliding windows are different to those of hopping windows.</p>
</div>
<p>The following code defines a hopping window with a
size of 5 minutes and an advance interval of 1 minute:</p>
- <div class="highlight-java"><div
class="highlight"><pre><span></span><span class="kn">import</span> <span
class="nn">java.util.concurrent.TimeUnit</span><span class="o">;</span>
+ <div class="highlight-java"><div
class="highlight"><pre><span></span><span class="kn">import</span> <span
class="nn">java.time.Duration</span><span class="o">;</span>
<span class="kn">import</span> <span
class="nn">org.apache.kafka.streams.kstream.TimeWindows</span><span
class="o">;</span>
<span class="c1">// A hopping time window with a size of 5 minutes and an
advance interval of 1 minute.</span>
<span class="c1">// The window's name -- the string parameter -- is used
to e.g. name the backing state store.</span>
-<span class="kt">long</span> <span class="n">windowSizeMs</span> <span
class="o">=</span> <span class="n">TimeUnit</span><span class="o">.</span><span
class="na">MINUTES</span><span class="o">.</span><span
class="na">toMillis</span><span class="o">(</span><span
class="mi">5</span><span class="o">);</span> <span class="c1">// 5 * 60 *
1000L</span>
-<span class="kt">long</span> <span class="n">advanceMs</span> <span
class="o">=</span> <span class="n">TimeUnit</span><span
class="o">.</span><span class="na">MINUTES</span><span class="o">.</span><span
class="na">toMillis</span><span class="o">(</span><span
class="mi">1</span><span class="o">);</span> <span class="c1">// 1 * 60 *
1000L</span>
+<span class="kt">Duration</span> <span class="n">windowSizeMs</span> <span
class="o">=</span> <span class="n">Duration</span><span class="o">.</span><span
class="na">ofMinutes</span><span class="o">(</span><span
class="mi">5</span><span class="o">);</span>
+<span class="kt">Duration</span> <span class="n">advanceMs</span> <span
class="o">=</span> <span class="n">Duration</span><span
class="o">.</span><span class="na">ofMinutes</span><span
class="o">(</span><span class="mi">1</span><span class="o">);</span>
<span class="n">TimeWindows</span><span class="o">.</span><span
class="na">of</span><span class="o">(</span><span
class="n">windowSizeMs</span><span class="o">).</span><span
class="na">advanceBy</span><span class="o">(</span><span
class="n">advanceMs</span><span class="o">);</span>
</pre></div>
</div>
@@ -2938,11 +2938,11 @@ milliseconds (e.g. t=5 would become
t=300,000).</span></p>
simple metrics (e.g. count of user visits on a
news website or social platform) to more complex metrics (e.g. customer
conversion funnel and event flows).</p>
<p>The following code defines a session window with an
inactivity gap of 5 minutes:</p>
- <div class="highlight-java"><div
class="highlight"><pre><span></span><span class="kn">import</span> <span
class="nn">java.util.concurrent.TimeUnit</span><span class="o">;</span>
+ <div class="highlight-java"><div
class="highlight"><pre><span></span><span class="kn">import</span> <span
class="nn">java.time.Duration</span><span class="o">;</span>
<span class="kn">import</span> <span
class="nn">org.apache.kafka.streams.kstream.SessionWindows</span><span
class="o">;</span>
<span class="c1">// A session window with an inactivity gap of 5
minutes.</span>
-<span class="n">SessionWindows</span><span class="o">.</span><span
class="na">with</span><span class="o">(</span><span
class="n">TimeUnit</span><span class="o">.</span><span
class="na">MINUTES</span><span class="o">.</span><span
class="na">toMillis</span><span class="o">(</span><span
class="mi">5</span><span class="o">));</span>
+<span class="n">SessionWindows</span><span class="o">.</span><span
class="na">with</span><span class="o">(</span><span
class="n">Duration</span><span class="o">.</span><span
class="na">ofMinutes</span><span class="o">(</span><span
class="mi">5</span><span class="o">));</span>
</pre></div>
</div>
<p>Given the previous session window example,
here’s what would happen on an input stream of six records.
@@ -3320,8 +3320,8 @@ t=5 (blue), which lead to a merge of sessions and an
extension of a session, res
<p>Here's an example of the classic WordCount program that uses
the Scala <code class="docutils literal"><span
class="pre">StreamsBuilder</span></code> that builds an instance of <code
class="docutils literal"><span class="pre">KStream</span></code> which is a
wrapper around Java <code class="docutils literal"><span
class="pre">KStream</span></code>. Then we reify to a table and get a <code
class="docutils literal"><span class="pre">KTable</span></code>, which, again
is a w [...]
<p>The net result is that the following code is structured just
like using the Java API, but with Scala and with far fewer type annotations
compared to using the Java API directly from Scala. The difference in type
annotation usage is more obvious when given an example. Below is an example
WordCount implementation that will be used to demonstrate the differences
between the Scala and Java API.</p>
<pre class="brush: scala;">
+import java.time.Duration
import java.util.Properties
-import java.util.concurrent.TimeUnit
import org.apache.kafka.streams.kstream.Materialized
import org.apache.kafka.streams.scala.ImplicitConversions._
@@ -3351,7 +3351,7 @@ object WordCountApplication extends App {
streams.start()
sys.ShutdownHookThread {
- streams.close(10, TimeUnit.SECONDS)
+ streams.close(Duration.ofSeconds(10))
}
}
</pre>
diff --git a/docs/streams/developer-guide/interactive-queries.html
b/docs/streams/developer-guide/interactive-queries.html
index 051f87c..ca0936e 100644
--- a/docs/streams/developer-guide/interactive-queries.html
+++ b/docs/streams/developer-guide/interactive-queries.html
@@ -202,7 +202,7 @@
<span class="o">.</span><span class="na">groupBy</span><span
class="o">((</span><span class="n">key</span><span class="o">,</span> <span
class="n">word</span><span class="o">)</span> <span class="o">-></span>
<span class="n">word</span><span class="o">,</span> <span
class="n">Serialized</span><span class="o">.</span><span
class="na">with</span><span class="o">(</span><span
class="n">stringSerde</span><span class="o">,</span> <span
class="n">stringSerde</span><span class="o">));</span>
<span class="c1">// Create a window state store named
"CountsWindowStore" that contains the word counts for every
minute</span>
-<span class="n">groupedByWord</span><span class="o">.</span><span
class="na">windowedBy</span><span class="o">(</span><span
class="n">TimeWindows</span><span class="o">.</span><span
class="na">of</span><span class="o">(</span><span class="mi">60000</span><span
class="o">))</span>
+<span class="n">groupedByWord</span><span class="o">.</span><span
class="na">windowedBy</span><span class="o">(</span><span
class="n">TimeWindows</span><span class="o">.</span><span
class="na">of</span><span class="o">(<span class="n">Duration</span><span
class="o">.</span><span class="na">ofSeconds</span><span
class="o">(</span><span class="mi">60</span><span class="o">)))</span>
<span class="o">.</span><span class="na">count</span><span
class="o">(</span><span class="n">Materialized</span><span
class="o">.<</span><span class="n">String</span><span class="o">,</span>
<span class="n">Long</span><span class="o">,</span> <span
class="n">WindowStore</span><span class="o"><</span><span
class="n">Bytes</span><span class="o">,</span> <span
class="kt">byte</span><span class="o">[]></span><span
class="n">as</span><span class="o">(</span><span class="s">"Co [...]
</pre></div>
</div>
@@ -213,8 +213,8 @@
<span class="c1">// Fetch values for the key "world" for all of the
windows available in this application instance.</span>
<span class="c1">// To get *all* available windows we fetch windows from the
beginning of time until now.</span>
-<span class="kt">long</span> <span class="n">timeFrom</span> <span
class="o">=</span> <span class="mi">0</span><span class="o">;</span> <span
class="c1">// beginning of time = oldest available</span>
-<span class="kt">long</span> <span class="n">timeTo</span> <span
class="o">=</span> <span class="n">System</span><span class="o">.</span><span
class="na">currentTimeMillis</span><span class="o">();</span> <span
class="c1">// now (in processing-time)</span>
+<span class="kt">Instant</span> <span class="n">timeFrom</span> <span
class="o">=</span> <span class="na">Instant</span><span class="o">.</span><span
class="na">ofEpochMilli<span class="o">(</span><span class="mi">0</span><span
class="o">);</span> <span class="c1">// beginning of time = oldest
available</span>
+<span class="kt">Instant</span> <span class="n">timeTo</span> <span
class="o">=</span> <span class="n">Instant</span><span class="o">.</span><span
class="na">now</span><span class="o">();</span> <span class="c1">// now (in
processing-time)</span>
<span class="n">WindowStoreIterator</span><span class="o"><</span><span
class="n">Long</span><span class="o">></span> <span
class="n">iterator</span> <span class="o">=</span> <span
class="n">windowStore</span><span class="o">.</span><span
class="na">fetch</span><span class="o">(</span><span
class="s">"world"</span><span class="o">,</span> <span
class="n">timeFrom</span><span class="o">,</span> <span
class="n">timeTo</span><span class="o">);</span>
<span class="k">while</span> <span class="o">(</span><span
class="n">iterator</span><span class="o">.</span><span
class="na">hasNext</span><span class="o">())</span> <span class="o">{</span>
<span class="n">KeyValue</span><span class="o"><</span><span
class="n">Long</span><span class="o">,</span> <span class="n">Long</span><span
class="o">></span> <span class="n">next</span> <span class="o">=</span>
<span class="n">iterator</span><span class="o">.</span><span
class="na">next</span><span class="o">();</span>
@@ -295,8 +295,8 @@
<span class="kd">private</span> <span class="kd">final</span> <span
class="n">StateStoreProvider</span> <span class="n">provider</span><span
class="o">;</span>
<span class="kd">public</span> <span
class="nf">CustomStoreTypeWrapper</span><span class="o">(</span><span
class="kd">final</span> <span class="n">StateStoreProvider</span> <span
class="n">provider</span><span class="o">,</span>
- <span class="kd">final</span> <span
class="n">String</span> <span class="n">storeName</span><span class="o">,</span>
- <span class="kd">final</span> <span
class="n">QueryableStoreType</span><span class="o"><</span><span
class="n">MyReadableCustomStore</span><span class="o"><</span><span
class="n">K</span><span class="o">,</span> <span class="n">V</span><span
class="o">>></span> <span class="n">customStoreType</span><span
class="o">)</span> <span class="o">{</span>
+ <span class="kd">final</span> <span
class="n">String</span> <span class="n">storeName</span><span class="o">,</span>
+ <span class="kd">final</span> <span
class="n">QueryableStoreType</span><span class="o"><</span><span
class="n">MyReadableCustomStore</span><span class="o"><</span><span
class="n">K</span><span class="o">,</span> <span class="n">V</span><span
class="o">>></span> <span class="n">customStoreType</span><span
class="o">)</span> <span class="o">{</span>
<span class="c1">// ... assign fields ...</span>
<span class="o">}</span>
diff --git a/docs/streams/developer-guide/processor-api.html
b/docs/streams/developer-guide/processor-api.html
index 0c83154..0d3ded2 100644
--- a/docs/streams/developer-guide/processor-api.html
+++ b/docs/streams/developer-guide/processor-api.html
@@ -132,8 +132,8 @@
<span class="c1">// retrieve the key-value store named
"Counts"</span>
<span class="n">kvStore</span> <span class="o">=</span> <span
class="o">(</span><span class="n">KeyValueStore</span><span class="o">)</span>
<span class="n">context</span><span class="o">.</span><span
class="na">getStateStore</span><span class="o">(</span><span
class="s">"Counts"</span><span class="o">);</span>
- <span class="c1">// schedule a punctuate() method every 1000
milliseconds based on stream-time</span>
- <span class="k">this</span><span class="o">.</span><span
class="na">context</span><span class="o">.</span><span
class="na">schedule</span><span class="o">(</span><span
class="mi">1000</span><span class="o">,</span> <span
class="n">PunctuationType</span><span class="o">.</span><span
class="na">STREAM_TIME</span><span class="o">,</span> <span
class="o">(</span><span class="n">timestamp</span><span class="o">)</span>
<span class="o">-></span> <span class="o">{</span>
+ <span class="c1">// schedule a punctuate() method every second based on
stream-time</span>
+ <span class="k">this</span><span class="o">.</span><span
class="na">context</span><span class="o">.</span><span
class="na">schedule</span><span class="o">(</span><span
class="na">Duration</span><span class="o">.</span><span
class="na">ofSeconds</span><span class="o">(</span><span
class="mi">1000</span><span class="o">),</span> <span
class="n">PunctuationType</span><span class="o">.</span><span
class="na">STREAM_TIME</span><span class="o">,</span> <span
class="o">(</span><span class [...]
<span class="n">KeyValueIterator</span><span
class="o"><</span><span class="n">String</span><span class="o">,</span>
<span class="n">Long</span><span class="o">></span> <span
class="n">iter</span> <span class="o">=</span> <span class="k">this</span><span
class="o">.</span><span class="na">kvStore</span><span class="o">.</span><span
class="na">all</span><span class="o">();</span>
<span class="k">while</span> <span class="o">(</span><span
class="n">iter</span><span class="o">.</span><span
class="na">hasNext</span><span class="o">())</span> <span class="o">{</span>
<span class="n">KeyValue</span><span class="o"><</span><span
class="n">String</span><span class="o">,</span> <span
class="n">Long</span><span class="o">></span> <span class="n">entry</span>
<span class="o">=</span> <span class="n">iter</span><span
class="o">.</span><span class="na">next</span><span class="o">();</span>
@@ -365,11 +365,11 @@
<code class="docutils literal"><span
class="pre">headers</span></code>.</p>
<p>Here is an example implementation of how to add a new header to
the record:</p>
<div class="highlight-java"><div
class="highlight"><pre><span></span><span class="n">public void process(String
key, String value) {</span>
-
- <span class="c1">// add a header to the elements</span>
- <span class="n">context()</span><span class="o">.</span><span
class="na">headers</span><span class="o">()</span><span class="o">.</span><span
class="na">add</span><span class="o">.</span><span class="o">(</span><span
class="s">"key"</span><span class="o">,</span> <span
class="s">"key"</span>
- <span class="o">}</span>
- </pre></div>
+
+ <span class="c1">// add a header to the elements</span>
+ <span class="n">context()</span><span class="o">.</span><span
class="na">headers</span><span class="o">()</span><span class="o">.</span><span
class="na">add</span><span class="o">.</span><span class="o">(</span><span
class="s">"key"</span><span class="o">,</span> <span
class="s">"key"</span>
+<span class="o">}</span>
+</pre></div>
</div>
<div class="section" id="connecting-processors-and-state-stores">
<h2><a class="toc-backref" href="#id8">Connecting Processors and
State Stores</a><a class="headerlink"
href="#connecting-processors-and-state-stores" title="Permalink to this
headline"></a></h2>
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index ddf6e4a..337366b 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -91,7 +91,28 @@
</p>
<p>
- We added a new serde for UUIDs (<code>Serdes.UUIDSerde</code>) that
you can use via <code>Serdes.UUID()</code> (cf. <a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-206%3A+Add+support+for+UUID+serialization+and+deserialization">KIP-206</a>).
+ We added a new serde for UUIDs (<code>Serdes.UUIDSerde</code>) that
you can use via <code>Serdes.UUID()</code>
+ (cf. <a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-206%3A+Add+support+for+UUID+serialization+and+deserialization">KIP-206</a>).
+ </p>
+
+ <p>
+ We updated a list of methods that take <code>long</long> arguments as
either timestamp (fix point) or duration (time period)
+ and replaced them with <code>Instant</code> and <code>Duration</code>
parameters for improved semantics.
+ Some old methods base on <code>long</code> are deprecated and users
are encouraged to update their code.
+ <br />
+ In particular, aggregation windows (hopping/tumbling/unlimited time
windows and session windows) as well as join windows now take
<code>Duration</code>
+ arguments to specify window size, hop, and gap parameters.
+ Also, window sizes and retention times are now specified as
<code>Duration</code> type in <code>Stores</code> class.
+ The <code>Window</code> class has new methods
<code>#startTime()</code> and <code>#endTime</code> that return window
start/end timestamp as <code>Instant</code>.
+ For interactive queries, there are new <code>#fetch(...)</code>
overloads taking <code>Instant</code> arguments.
+ Additionally, punctuations are now registerd via
<code>ProcessorContext#schedule(Duration interval, ...)</code>.
+ For more details, see <a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-358%3A+Migrate+Streams+API+to+Duration+instead+of+long+ms+times">KIP-358</a>.
+ </p>
+
+ <p>
+ We deprecated <code>KafkaStreams#close(...)</code> and replaced it
with <code>KafkaStreams#close(Duration)</code> that accepts a single timeout
argument
+ Note: the new <code>#close</code> method has improved (but slightly
different) semantics than the old one.
+ For more details, see <a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-358%3A+Migrate+Streams+API+to+Duration+instead+of+long+ms+times">KIP-358</a>.
</p>
<h3><a id="streams_api_changes_200"
href="#streams_api_changes_200">Streams API changes in 2.0.0</a></h3>
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
index b6b2178..4dfba23 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import java.time.Duration;
import java.util.Map;
import static
org.apache.kafka.streams.kstream.internals.WindowingDefaults.DEFAULT_RETENTION_MS;
@@ -57,7 +58,7 @@ public abstract class Windows<W extends Window> {
* @param durationMs the window retention time in milliseconds
* @return itself
* @throws IllegalArgumentException if {@code durationMs} is negative
- * @deprecated since 2.1. Use {@link Materialized#withRetention(long)}
+ * @deprecated since 2.1. Use {@link Materialized#withRetention(Duration)}
* or directly configure the retention in a store supplier and
use {@link Materialized#as(WindowBytesStoreSupplier)}.
*/
@Deprecated