This is an automated email from the ASF dual-hosted git repository.

wcarlson pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/kafka-site.git


The following commit(s) were added to refs/heads/asf-site by this push:
     new 499a69bc MINOR: update 3.6 (#593)
499a69bc is described below

commit 499a69bceec751956df4dc40d9589fd08f2c4c81
Author: Walker Carlson <[email protected]>
AuthorDate: Tue Apr 2 10:47:44 2024 -0500

    MINOR: update 3.6 (#593)
---
 36/streams/developer-guide/dsl-api.html | 16 ++++++++++++++++
 36/streams/upgrade-guide.html           |  9 +++++++++
 2 files changed, 25 insertions(+)

diff --git a/36/streams/developer-guide/dsl-api.html 
b/36/streams/developer-guide/dsl-api.html
index 94f7f22b..2cacf39b 100644
--- a/36/streams/developer-guide/dsl-api.html
+++ b/36/streams/developer-guide/dsl-api.html
@@ -2818,6 +2818,7 @@ KStream&lt;String, String&gt; joined = left.join(right,
     (leftValue, rightValue) -&gt; &quot;left=&quot; + leftValue + &quot;, 
right=&quot; + rightValue, /* ValueJoiner */
     Joined.keySerde(Serdes.String()) /* key */
       .withValueSerde(Serdes.Long()) /* left value */
+      .withGracePeriod(Duration.ZERO) /* grace period */
   );
 
 // Java 7 example
@@ -2830,6 +2831,7 @@ KStream&lt;String, String&gt; joined = left.join(right,
     },
     Joined.keySerde(Serdes.String()) /* key */
       .withValueSerde(Serdes.Long()) /* left value */
+      .withGracePeriod(Duration.ZERO) /* grace period */
   );</code></pre>
                                     <p>Detailed behavior:</p>
                                     <ul>
@@ -2849,6 +2851,12 @@ KStream&lt;String, String&gt; joined = left.join(right,
                                         <li>When the table is <a 
class="reference internal" href="#versioned-state-stores"><span class="std 
std-ref">versioned</span></a>,
                                             the table record to join with is 
determined by performing a timestamped lookup, i.e., the table record which is 
joined will be the latest-by-timestamp record with timestamp
                                             less than or equal to the stream 
record timestamp. If the stream record timestamp is older than the table's 
history retention, then the record is dropped.</li>
+                                        <li>To use the grace period, the table 
needs to be <a class="reference internal" href="#versioned-state-stores"><span 
class="std std-ref">versioned</span></a>.
+                                            This will cause the stream to 
buffer for the specified grace period before trying to find a matching record 
with the right timestamp in the table.
+                                            The case where the grace period 
would be used for is if a record in the table has a timestamp less than or 
equal to the stream record timestamp but arrives after the stream record.
+                                            If the table record arrives within 
the grace period the join will still occur.
+                                            If the table record does not 
arrive before the grace period the join will continue as normal.
+                                        </li>
                                     </ul>
                                     <p class="last">See the semantics overview 
at the bottom of this section for a detailed description.</p>
                                 </td>
@@ -2872,6 +2880,7 @@ KStream&lt;String, String&gt; joined = 
left.leftJoin(right,
     (leftValue, rightValue) -&gt; &quot;left=&quot; + leftValue + &quot;, 
right=&quot; + rightValue, /* ValueJoiner */
     Joined.keySerde(Serdes.String()) /* key */
       .withValueSerde(Serdes.Long()) /* left value */
+      .withGracePeriod(Duration.ZERO) /* grace period */
   );
 
 // Java 7 example
@@ -2884,6 +2893,7 @@ KStream&lt;String, String&gt; joined = 
left.leftJoin(right,
     },
     Joined.keySerde(Serdes.String()) /* key */
       .withValueSerde(Serdes.Long()) /* left value */
+      .withGracePeriod(Duration.ZERO) /* grace period */
   );</code></pre>
                                     <p>Detailed behavior:</p>
                                     <ul>
@@ -2906,6 +2916,12 @@ KStream&lt;String, String&gt; joined = 
left.leftJoin(right,
                                         <li>When the table is <a 
class="reference internal" href="#versioned-state-stores"><span class="std 
std-ref">versioned</span></a>,
                                             the table record to join with is 
determined by performing a timestamped lookup, i.e., the table record which is 
joined will be the latest-by-timestamp record with timestamp
                                             less than or equal to the stream 
record timestamp. If the stream record timestamp is older than the table's 
history retention, then the record that is joined will be <code class="docutils 
literal"><span class="pre">null</span></code>.</li>
+                                        <li>To use the grace period, the table 
needs to be <a class="reference internal" href="#versioned-state-stores"><span 
class="std std-ref">versioned</span></a>.
+                                            This will cause the stream to 
buffer for the specified grace period before trying to find a matching record 
with the right timestamp in the table.
+                                            The case where the grace period 
would be used for is if a record in the table has a timestamp less than or 
equal to the stream record timestamp but arrives after the stream record.
+                                            If the table record arrives within 
the grace period the join will still occur.
+                                            If the table record does not 
arrive before the grace period the join will continue as normal.
+                                        </li>
                                     </ul>
                                     <p class="last">See the semantics overview 
at the bottom of this section for a detailed description.</p>
                                 </td>
diff --git a/36/streams/upgrade-guide.html b/36/streams/upgrade-guide.html
index d81df638..49ae808e 100644
--- a/36/streams/upgrade-guide.html
+++ b/36/streams/upgrade-guide.html
@@ -147,6 +147,15 @@
       as upper and lower bound (with semantics "no bound") to simplify the 
usage of the <code>RangeQuery</code> class.
     </p>
 
+    <p>
+        KStreams-to-KTable joins now have an option for adding a grace period.
+        The grace period is enabled on the <code>Joined</code> object using 
with <code>withGracePeriod()</code> method.
+        This change was introduced in <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-923%3A+Add+A+Grace+Period+to+Stream+Table+Join";>KIP-923</a>.
+        To use the grace period option in the Stream-Table join the table must 
be
+        <a 
href="/{{version}}/documentation/streams/developer-guide/dsl-api.html#versioned-state-stores">versioned</a>.
+        For more information, including how it can be enabled and further 
configured, see the <a 
href="/{{version}}/documentation/streams/developer-guide/config-streams.html#rack-aware-assignment-strategy"><b>Kafka
 Streams Developer Guide</b></a>.
+    </p>
+
     <h3><a id="streams_api_changes_350" 
href="#streams_api_changes_350">Streams API changes in 3.5.0</a></h3>
     <p>      
       A new state store type, versioned key-value stores, was introduced in

Reply via email to