This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bfdef11  KAFKA-12360: Document new time semantics (#11003)
bfdef11 is described below

commit bfdef11b9781c2cdd4cd526aeca3bf3898d34422
Author: John Roesler <[email protected]>
AuthorDate: Mon Jul 12 16:16:29 2021 -0500

    KAFKA-12360: Document new time semantics (#11003)
    
    Update the docs for task idling, since the semantics have
    changed in 3.0.
    
    Reviewers: Jim Galasyn <[email protected]>, Luke Chen 
<[email protected]>, Boyang Chen <[email protected]>
---
 docs/streams/developer-guide/config-streams.html   | 67 ++++++++++++++++++++--
 docs/streams/upgrade-guide.html                    | 17 ++++++
 .../org/apache/kafka/streams/StreamsConfig.java    | 14 ++++-
 3 files changed, 92 insertions(+), 6 deletions(-)

diff --git a/docs/streams/developer-guide/config-streams.html 
b/docs/streams/developer-guide/config-streams.html
index f111dc2..05846a0 100644
--- a/docs/streams/developer-guide/config-streams.html
+++ b/docs/streams/developer-guide/config-streams.html
@@ -243,8 +243,21 @@ settings.put(... , ...);</code></pre>
           </tr>
           <tr class="row-even"><td>max.task.idle.ms</td>
             <td>Medium</td>
-            <td colspan="2">Maximum amount of time in milliseconds a stream 
task will stay idle while waiting for all partitions to contain data
-              and avoid potential out-of-order record processing across 
multiple input streams.</td>
+            <td colspan="2">
+              <p>
+                This config controls whether joins and merges may produce 
out-of-order results.
+                The config value is the maximum amount of time in milliseconds 
a stream task will stay idle
+                when it is fully caught up on some (but not all) input 
partitions
+                to wait for producers to send additional records and avoid 
potential
+                out-of-order record processing across multiple input streams.
+                The default (zero) does not wait for producers to send more 
records,
+                but it does wait to fetch data that is already present on the 
brokers.
+                This default means that for records that are already present 
on the brokers,
+                Streams will process them in timestamp order.
+                Set to -1 to disable idling entirely and process any locally 
available data,
+                even though doing so may produce out-of-order processing.
+              </p>
+            </td>
             <td>0 milliseconds</td>
           </tr>
           <tr class="row-odd"><td>max.warmup.replicas</td>
@@ -602,8 +615,54 @@ 
streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
           <span id="streams-developer-guide-max-task-idle-ms"></span><h4><a 
class="toc-backref" href="#id28">max.task.idle.ms</a><a class="headerlink" 
href="#max-task-idle-ms" title="Permalink to this headline"></a></h4>
           <blockquote>
             <div>
-              The maximum amount of time a task will idle without processing 
data when waiting for all of its input partition buffers to contain records. 
This can help avoid potential out-of-order
-              processing when the task has multiple input streams, as in a 
join, for example. Setting this to a nonzero value may increase latency but 
will improve time synchronization.
+              <p>
+                This configuration controls how long Streams will wait to 
fetch data in order to
+                provide in-order processing semantics.
+              </p>
+              <p>
+                When processing a task that has multiple input partitions (as 
in a join or merge),
+                Streams needs to choose which partition to process the next 
record from.
+                When all input partitions have locally buffered data, Streams 
picks the partition
+                whose next record has the lowest timestamp. This has the 
desirable effect of
+                collating the input partitions in timestamp order, which is 
generally what you
+                want in a streaming join or merge. However, when Streams does 
not have any data
+                buffered locally for one of the partitions, it does not know 
whether the next
+                record for that partition will have a lower or higher 
timestamp than the remaining
+                partitions' records.
+              </p>
+              <p>
+                There are two cases to consider: either there is data in that 
partition on the
+                broker that Streams has not fetched yet, or Streams is fully 
caught up with that
+                partition on the broker, and the producers simply haven't 
produced any new records
+                since Streams polled the last batch.
+              </p>
+              <p>
+                The default value of
+                <code class="docutils literal"><span 
class="pre">0</span></code>
+                causes Streams to delay processing a task when it detects that 
it has no locally
+                buffered data for a partition, but there is data available on 
the brokers.
+                Specifically, when there is an empty partition in the local 
buffer, but Streams
+                has a non-zero lag for that partition. However, as soon as 
Streams catches up to
+                the broker, it will continue processing, even if there is no 
data in one of the
+                partitions. That is, it will not wait for new data to be 
<em>produced</em>.
+                This default is designed to sacrifice some throughput in 
exchange for intuitively
+                correct join semantics.
+              </p>
+              <p>
+                Any config value greater than zero indicates the number of 
<em>extra</em>
+                milliseconds that Streams will wait if it has a caught-up but 
empty partition.
+                In other words, this is the amount of time to wait for new 
data to be produced
+                to the input partitions to ensure in-order processing of data 
in the event
+                of a slow producer.
+              </p>
+              <p>
+                The config value of
+                <code class="docutils literal"><span 
class="pre">-1</span></code>
+                indicates that Streams will never wait to buffer empty 
partitions before choosing
+                the next record by timestamp, which achieves maximum 
throughput at the expense of
+                introducing out-of-order processing.
+              </p>
+
             </div>
           </blockquote>
         </div>
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 136d093..a6eb6aa 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -96,6 +96,23 @@
 
     <h3><a id="streams_api_changes_300" 
href="#streams_api_changes_300">Streams API changes in 3.0.0</a></h3>
     <p>
+      We improved the semantics of
+      <a 
href="/documentation/streams/developer-guide/config-streams.html#max-task-idle-ms">task
 idling (<code>max.task.idle.ms</code>)</a>.
+      Now Streams provides stronger in-order join and merge processing 
semantics.
+      Streams's new default pauses processing on tasks with multiple input 
partitions
+      when one of the partitions has no data buffered locally but has a 
non-zero lag. In other
+      words, Streams will wait to fetch records that are already available on 
the broker. This
+      results in improved join semantics, since it allows Streams to 
interleave the two input
+      partitions in timestamp order instead of just processing whichever 
partition happens to be
+      buffered. There is an option to disable this new behavior, and there is 
also an option to
+      make Streams wait even longer for new records to be <em>produced</em> to 
the input partitions,
+      which you can use to get stronger time semantics when you know some of 
your producers may be
+      slow. See the
+      <a 
href="/documentation/streams/developer-guide/config-streams.html#max-task-idle-ms">config
 reference</a>
+      for more information, and <a 
href="https://cwiki.apache.org/confluence/x/JSXZCQ";>KIP-695</a>
+      for the larger context of this change.
+    </p>
+    <p>
         Interactive Queries may throw new exceptions for different errors:
     </p>
     <ul>
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 79c2e3f..8c48aa1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -427,8 +427,18 @@ public class StreamsConfig extends AbstractConfig {
 
     /** {@code max.task.idle.ms} */
     public static final String MAX_TASK_IDLE_MS_CONFIG = "max.task.idle.ms";
-    private static final String MAX_TASK_IDLE_MS_DOC = "Maximum amount of time 
in milliseconds a stream task will stay idle when not all of its partition 
buffers contain records," +
-        " to avoid potential out-of-order record processing across multiple 
input streams.";
+    private static final String MAX_TASK_IDLE_MS_DOC = "This config controls 
whether joins and merges"
+        + " may produce out-of-order results."
+        + " The config value is the maximum amount of time in milliseconds a 
stream task will stay idle"
+        + " when it is fully caught up on some (but not all) input partitions"
+        + " to wait for producers to send additional records and avoid 
potential"
+        + " out-of-order record processing across multiple input streams."
+        + " The default (zero) does not wait for producers to send more 
records,"
+        + " but it does wait to fetch data that is already present on the 
brokers."
+        + " This default means that for records that are already present on 
the brokers,"
+        + " Streams will process them in timestamp order."
+        + " Set to -1 to disable idling entirely and process any locally 
available data,"
+        + " even though doing so may produce out-of-order processing.";
 
     /** {@code max.warmup.replicas} */
     public static final String MAX_WARMUP_REPLICAS_CONFIG = 
"max.warmup.replicas";

Reply via email to