This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new f69be68 KAFKA-12360: Document new time semantics (#11003)
f69be68 is described below
commit f69be68c5871a41baaf20683a1a70945fe026bf5
Author: John Roesler <[email protected]>
AuthorDate: Mon Jul 12 16:16:29 2021 -0500
KAFKA-12360: Document new time semantics (#11003)
Update the docs for task idling, since the semantics have
changed in 3.0.
Reviewers: Jim Galasyn <[email protected]>, Luke Chen
<[email protected]>, Boyang Chen <[email protected]>
---
docs/streams/developer-guide/config-streams.html | 67 ++++++++++++++++++++--
docs/streams/upgrade-guide.html | 17 ++++++
.../org/apache/kafka/streams/StreamsConfig.java | 14 ++++-
3 files changed, 92 insertions(+), 6 deletions(-)
diff --git a/docs/streams/developer-guide/config-streams.html
b/docs/streams/developer-guide/config-streams.html
index f111dc2..05846a0 100644
--- a/docs/streams/developer-guide/config-streams.html
+++ b/docs/streams/developer-guide/config-streams.html
@@ -243,8 +243,21 @@ settings.put(... , ...);</code></pre>
</tr>
<tr class="row-even"><td>max.task.idle.ms</td>
<td>Medium</td>
- <td colspan="2">Maximum amount of time in milliseconds a stream
task will stay idle while waiting for all partitions to contain data
- and avoid potential out-of-order record processing across
multiple input streams.</td>
+ <td colspan="2">
+ <p>
+ This config controls whether joins and merges may produce
out-of-order results.
+ The config value is the maximum amount of time in milliseconds
a stream task will stay idle
+ when it is fully caught up on some (but not all) input
partitions
+ to wait for producers to send additional records and avoid
potential
+ out-of-order record processing across multiple input streams.
+ The default (zero) does not wait for producers to send more
records,
+ but it does wait to fetch data that is already present on the
brokers.
+ This default means that for records that are already present
on the brokers,
+ Streams will process them in timestamp order.
+ Set to -1 to disable idling entirely and process any locally
available data,
+ even though doing so may produce out-of-order processing.
+ </p>
+ </td>
<td>0 milliseconds</td>
</tr>
<tr class="row-odd"><td>max.warmup.replicas</td>
@@ -602,8 +615,54 @@
streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
<span id="streams-developer-guide-max-task-idle-ms"></span><h4><a
class="toc-backref" href="#id28">max.task.idle.ms</a><a class="headerlink"
href="#max-task-idle-ms" title="Permalink to this headline"></a></h4>
<blockquote>
<div>
- The maximum amount of time a task will idle without processing
data when waiting for all of its input partition buffers to contain records.
This can help avoid potential out-of-order
- processing when the task has multiple input streams, as in a
join, for example. Setting this to a nonzero value may increase latency but
will improve time synchronization.
+ <p>
+ This configuration controls how long Streams will wait to
fetch data in order to
+ provide in-order processing semantics.
+ </p>
+ <p>
+ When processing a task that has multiple input partitions (as
in a join or merge),
+ Streams needs to choose which partition to process the next
record from.
+ When all input partitions have locally buffered data, Streams
picks the partition
+ whose next record has the lowest timestamp. This has the
desirable effect of
+ collating the input partitions in timestamp order, which is
generally what you
+ want in a streaming join or merge. However, when Streams does
not have any data
+ buffered locally for one of the partitions, it does not know
whether the next
+ record for that partition will have a lower or higher
timestamp than the remaining
+ partitions' records.
+ </p>
+ <p>
+ There are two cases to consider: either there is data in that
partition on the
+ broker that Streams has not fetched yet, or Streams is fully
caught up with that
+ partition on the broker, and the producers simply haven't
produced any new records
+ since Streams polled the last batch.
+ </p>
+ <p>
+ The default value of
+ <code class="docutils literal"><span
class="pre">0</span></code>
+ causes Streams to delay processing a task when it detects that
it has no locally
+ buffered data for a partition, but there is data available on
the brokers.
+ Specifically, when there is an empty partition in the local
buffer, but Streams
+ has a non-zero lag for that partition. However, as soon as
Streams catches up to
+ the broker, it will continue processing, even if there is no
data in one of the
+ partitions. That is, it will not wait for new data to be
<em>produced</em>.
+ This default is designed to sacrifice some throughput in
exchange for intuitively
+ correct join semantics.
+ </p>
+ <p>
+ Any config value greater than zero indicates the number of
<em>extra</em>
+ milliseconds that Streams will wait if it has a caught-up but
empty partition.
+ In other words, this is the amount of time to wait for new
data to be produced
+ to the input partitions to ensure in-order processing of data
in the event
+ of a slow producer.
+ </p>
+ <p>
+ The config value of
+ <code class="docutils literal"><span
class="pre">-1</span></code>
+ indicates that Streams will never wait to buffer empty
partitions before choosing
+ the next record by timestamp, which achieves maximum
throughput at the expense of
+ introducing out-of-order processing.
+ </p>
+
</div>
</blockquote>
</div>
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 136d093..a6eb6aa 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -96,6 +96,23 @@
<h3><a id="streams_api_changes_300"
href="#streams_api_changes_300">Streams API changes in 3.0.0</a></h3>
<p>
+ We improved the semantics of
+ <a
href="/documentation/streams/developer-guide/config-streams.html#max-task-idle-ms">task
idling (<code>max.task.idle.ms</code>)</a>.
+ Now Streams provides stronger in-order join and merge processing
semantics.
+ Streams's new default pauses processing on tasks with multiple input
partitions
+ when one of the partitions has no data buffered locally but has a
non-zero lag. In other
+ words, Streams will wait to fetch records that are already available on
the broker. This
+ results in improved join semantics, since it allows Streams to
interleave the two input
+ partitions in timestamp order instead of just processing whichever
partition happens to be
+ buffered. There is an option to disable this new behavior, and there is
also an option to
+ make Streams wait even longer for new records to be <em>produced</em> to
the input partitions,
+ which you can use to get stronger time semantics when you know some of
your producers may be
+ slow. See the
+ <a
href="/documentation/streams/developer-guide/config-streams.html#max-task-idle-ms">config
reference</a>
+ for more information, and <a
href="https://cwiki.apache.org/confluence/x/JSXZCQ">KIP-695</a>
+ for the larger context of this change.
+ </p>
+ <p>
Interactive Queries may throw new exceptions for different errors:
</p>
<ul>
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 79c2e3f..8c48aa1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -427,8 +427,18 @@ public class StreamsConfig extends AbstractConfig {
/** {@code max.task.idle.ms} */
public static final String MAX_TASK_IDLE_MS_CONFIG = "max.task.idle.ms";
- private static final String MAX_TASK_IDLE_MS_DOC = "Maximum amount of time
in milliseconds a stream task will stay idle when not all of its partition
buffers contain records," +
- " to avoid potential out-of-order record processing across multiple
input streams.";
+ private static final String MAX_TASK_IDLE_MS_DOC = "This config controls
whether joins and merges"
+ + " may produce out-of-order results."
+ + " The config value is the maximum amount of time in milliseconds a
stream task will stay idle"
+ + " when it is fully caught up on some (but not all) input partitions"
+ + " to wait for producers to send additional records and avoid
potential"
+ + " out-of-order record processing across multiple input streams."
+ + " The default (zero) does not wait for producers to send more
records,"
+ + " but it does wait to fetch data that is already present on the
brokers."
+ + " This default means that for records that are already present on
the brokers,"
+ + " Streams will process them in timestamp order."
+ + " Set to -1 to disable idling entirely and process any locally
available data,"
+ + " even though doing so may produce out-of-order processing.";
/** {@code max.warmup.replicas} */
public static final String MAX_WARMUP_REPLICAS_CONFIG =
"max.warmup.replicas";