This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 4.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit a1b3dfe07d28a20f778c4abecbc1f792eff2b940 Author: Matthias J. Sax <[email protected]> AuthorDate: Mon Dec 16 15:21:52 2024 -0800 MINOR: improve Kafka Streams config docs (#18087) Reviewers: Anna Sophie Blee-Goldman <[email protected]> --- docs/streams/developer-guide/config-streams.html | 104 +++++++++++++-------- .../org/apache/kafka/streams/StreamsConfig.java | 44 ++++----- 2 files changed, 86 insertions(+), 62 deletions(-) diff --git a/docs/streams/developer-guide/config-streams.html b/docs/streams/developer-guide/config-streams.html index 62338390149..dc640d2cee6 100644 --- a/docs/streams/developer-guide/config-streams.html +++ b/docs/streams/developer-guide/config-streams.html @@ -232,9 +232,9 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre> <p>Here are the optional <a href="/{{version}}/javadoc/org/apache/kafka/streams/StreamsConfig.html">Streams</a> javadocs, sorted by level of importance:</p> <blockquote> <div><ul class="simple"> - <li>High: These parameters can have a significant impact on performance. Take care when deciding the values of these parameters.</li> - <li>Medium: These parameters can have some impact on performance. Your specific environment will determine how much tuning effort should be focused on these parameters.</li> - <li>Low: These parameters have a less general or less significant impact on performance.</li> + <li>High: These are parameters with a default value which is most likely not a good fit for production use. It's highly recommended to revisit these parameters for production usage.</li> + <li>Medium: The default values of these parameters should work for production for many cases, but it's not uncommon that they are changed, for example to tune performance.</li> + <li>Low: It should rarely be necessary to change the value for these parameters. It's only recommended to change them if there is a very specific issue you want to address.</li> </ul> </div></blockquote> <table border="1" class="non-scrolling-table docutils"> @@ -266,12 +266,12 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre> <tr class="row-odd"><td>statestore.cache.max.bytes</td> <td>Medium</td> <td colspan="2">Maximum number of memory bytes to be used for record caches across all threads.</td> - <td>10485760</td> + <td><code class="docutils literal"><span class="pre">10485760</span></code></td> </tr> <tr class="row-even"><td>cache.max.bytes.buffering (Deprecated. Use statestore.cache.max.bytes instead.)</td> <td>Medium</td> <td colspan="2">Maximum number of memory bytes to be used for record caches across all threads.</td> - <td>10485760 bytes</td> + <td><code class="docutils literal"><span class="pre">10485760</span></code></td> </tr> <tr class="row-odd"><td>client.id</td> <td>Medium</td> @@ -282,7 +282,7 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre> <tr class="row-even"><td>commit.interval.ms</td> <td>Low</td> <td colspan="2">The frequency in milliseconds with which to save the position (offsets in source topics) of tasks.</td> - <td>30000 milliseconds (30 seconds)</td> + <td><code class="docutils literal"><span class="pre">30000</span></code> (30 seconds)</td> </tr> <tr class="row-odd"><td>default.deserialization.exception.handler (Deprecated. Use deserialization.exception.handler instead.)</td> <td>Medium</td> @@ -302,8 +302,9 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre> </tr> <tr class="row-even"><td>default.timestamp.extractor</td> <td>Medium</td> - <td colspan="2">Timestamp extractor class that implements the <code class="docutils literal"><span class="pre">TimestampExtractor</span></code> interface.</td> - <td>See <a class="reference internal" href="#streams-developer-guide-timestamp-extractor"><span class="std std-ref">Timestamp Extractor</span></a></td> + <td colspan="2">Timestamp extractor class that implements the <code class="docutils literal"><span class="pre">TimestampExtractor</span></code> interface. + See <a class="reference internal" href="#streams-developer-guide-timestamp-extractor"><span class="std std-ref">Timestamp Extractor</span></a></td> + <td><code class="docutils literal"><span class="pre">FailOnInvalidTimestamp</span></code></td> </tr> <tr class="row-odd"><td>default.value.serde</td> <td>Medium</td> @@ -317,7 +318,7 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre> [DEPRECATED] The default state store type used by DSL operators. Deprecated in favor of <code>dsl.store.suppliers.class</code> </td> - <td><code>ROCKS_DB</code></td> + <td><code class="docutils literal"><span class="pre">"ROCKS_DB"</span></code></td> </tr> <tr class="row-odd"><td>deserialization.exception.handler</td> <td>Medium</td> @@ -336,7 +337,7 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre> <tr class="row-odd"><td>log.summary.interval.ms</td> <td>Low</td> <td colspan="2">The output interval in milliseconds for logging summary information (disabled if negative).</td> - <td>120000 milliseconds (2 minutes)</td> + <td><code class="docutils literal"><span class="pre">120000</span></code> (2 minutes)</td> </tr> <tr class="row-even"><td>max.task.idle.ms</td> <td>Medium</td> @@ -355,7 +356,7 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre> even though doing so may produce out-of-order processing. </p> </td> - <td>0 milliseconds</td> + <td><code class="docutils literal"><span class="pre">0</span></code></td> </tr> <tr class="row-odd"><td>max.warmup.replicas</td> <td>Medium</td> @@ -380,7 +381,7 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre> <tr class="row-odd"><td>metrics.sample.window.ms</td> <td>Low</td> <td colspan="2">The window of time in milliseconds a metrics sample is computed over.</td> - <td>30000 milliseconds (30 seconds)</td> + <td><code class="docutils literal"><span class="pre">30000</span></code> (30 seconds)</td> </tr> <tr class="row-even"><td>num.standby.replicas</td> <td>High</td> @@ -395,7 +396,7 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre> <tr class="row-even"><td>probing.rebalance.interval.ms</td> <td>Low</td> <td colspan="2">The maximum time in milliseconds to wait before triggering a rebalance to probe for warmup replicas that have sufficiently caught up.</td> - <td>600000 milliseconds (10 minutes)</td> + <td><code class="docutils literal"><span class="pre">600000</span></code> (10 minutes)</td> </tr> <tr class="row-odd"><td>processing.exception.handler</td> <td>Medium</td> @@ -404,10 +405,10 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre> </tr> <tr class="row-even"><td>processing.guarantee</td> <td>Medium</td> - <td colspan="2">The processing mode. Can be either <code class="docutils literal"><span class="pre">"at_least_once"</span></code> (default) - or <code class="docutils literal"><span class="pre">"exactly_once_v2"</span></code> (for EOS version 2, requires broker version 2.5+). Deprecated config options are - <code class="docutils literal"><span class="pre">"exactly_once"</span></code> (for EOS version 1) and <code class="docutils literal"><span class="pre">"exactly_once_beta"</span></code> (for EOS version 2, requires broker version 2.5+)</td>. - <td>See <a class="reference internal" href="#streams-developer-guide-processing-guarantee"><span class="std std-ref">Processing Guarantee</span></a></td> + <td colspan="2">The processing mode. Can be either <code class="docutils literal"><span class="pre">"at_least_once"</span></code> + or <code class="docutils literal"><span class="pre">"exactly_once_v2"</span></code> (for EOS version 2, requires broker version 2.5+). + See <a class="reference internal" href="#streams-developer-guide-processing-guarantee"><span class="std std-ref">Processing Guarantee</span></a>.</td>. + <td><code class="docutils literal"><span class="pre">"at_least_once"</span></code></td> </tr> <tr class="row-odd"><td>processor.wrapper.class</td> <td>Medium</td> @@ -424,68 +425,91 @@ streamsSettings.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);</code></pre> <tr class="row-odd"><td>poll.ms</td> <td>Low</td> <td colspan="2">The amount of time in milliseconds to block waiting for input.</td> - <td>100 milliseconds</td> + <td><code class="docutils literal"><span class="pre">100</span></code></td> </tr> - <tr class="row-even"><td>rack.aware.assignment.tags</td> - <td>Medium</td> + <tr class="row-evem"><td>rack.aware.assignment.strategy</td> + <td>Low</td> + <td colspan="2">The strategy used for rack aware assignment. Acceptable value are + <code class="docutils literal"><span class="pre">"none"</span></code> (default), + <code class="docutils literal"><span class="pre">"min_traffic"</span></code>, and + <code class="docutils literal"><span class="pre">"balance_suttopology"</span></code>. + See <a class="reference internal" href="#rack-aware-assignment-strategy"><span class="std std-ref">Rack Aware Assignment Strategy</span></a>.</td> + <td><code class="docutils literal"><span class="pre">"none"</span></code></td> + </tr> + <tr class="odd"><td>rack.aware.assignment.tags</td> + <td>Low</td> <td colspan="2">List of tag keys used to distribute standby replicas across Kafka Streams clients. When configured, Kafka Streams will make a best-effort to distribute the standby tasks over - clients with different tag values.</td> + clients with different tag values. + See <a class="reference internal" href="#rack-aware-assignment-tags"><span class="std std-ref">Rack Aware Assignment Tags</span></a>.</td> <td>the empty list</td> </tr> - <tr class="row-odd"><td>replication.factor</td> + <tr class="row-even"><td>rack.aware.assignment.non_overlap_cost</td> + <td>Low</td> + <td colspan="2">Cost associated with moving tasks from existing assignment. + See <a class="reference internal" href="#rack-aware-assignment-non-overlap-cost"><span class="std std-ref">Rack Aware Assignment Non-Overlap-Cost</span></a>.</td> + <td><code class="docutils literal"><span class="pre">null</span></code></td> + </tr> + <tr class="odd"><td>rack.aware.assignment.non_overlap_cost</td> + <td>Low</td> + <td colspan="2">Cost associated with cross rack traffic. + See <a class="reference internal" href="#rack-aware-assignment-traffic-cost"><span class="std std-ref">Rack Aware Assignment Traffic-Cost</span></a>.</td> + <td><code class="docutils literal"><span class="pre">null</span></code></td> + </tr> + <tr class="even"><td>replication.factor</td> <td>Medium</td> <td colspan="2">The replication factor for changelog topics and repartition topics created by the application. The default of <code>-1</code> (meaning: use broker default replication factor) requires broker version 2.4 or newer.</td> <td><code class="docutils literal"><span class="pre">-1</span></code></td> </tr> - <tr class="row-even"><td>retry.backoff.ms</td> - <td>Medium</td> + <tr class="odd"><td>retry.backoff.ms</td> + <td>Low</td> <td colspan="2">The amount of time in milliseconds, before a request is retried.</td> <td><code class="docutils literal"><span class="pre">100</span></code></td> </tr> - <tr class="row-odd"><td>rocksdb.config.setter</td> + <tr class="row-even"><td>rocksdb.config.setter</td> <td>Medium</td> <td colspan="2">The RocksDB configuration.</td> - <td></td> + <td><code class="docutils literal"><span class="pre">null</span></code></td> </tr> - <tr class="row-even"><td>state.cleanup.delay.ms</td> + <tr class="row-odd"><td>state.cleanup.delay.ms</td> <td>Low</td> <td colspan="2">The amount of time in milliseconds to wait before deleting state when a partition has migrated.</td> - <td>600000 milliseconds (10 minutes)</td> + <td><code class="docutils literal"><span class="pre">600000</span></code></td> (10 minutes)</td> </tr> - <tr class="row-odd"><td>state.dir</td> + <tr class="row-even"><td>state.dir</td> <td>High</td> <td colspan="2">Directory location for state stores.</td> <td><code class="docutils literal"><span class="pre">/${java.io.tmpdir}/kafka-streams</span></code></td> </tr> - <tr class="row-even"><td>task.assignor.class</td> + <tr class="row-odd"><td>task.assignor.class</td> <td>Medium</td> <td colspan="2">A task assignor class or class name implementing the <code>TaskAssignor</code> interface.</td> <td>The high-availability task assignor.</td> </tr> - <tr class="row-odd"><td>task.timeout.ms</td> + <tr class="row-even"><td>task.timeout.ms</td> <td>Medium</td> <td colspan="2">The maximum amount of time in milliseconds a task might stall due to internal errors and retries until an error is raised. For a timeout of <code>0 ms</code>, a task would raise an error for the first internal error. For any timeout larger than <code>0 ms</code>, a task will retry at least once before an error is raised.</td> - <td>300000 milliseconds (5 minutes)</td> + <td><code class="docutils literal"><span class="pre">300000</span></code></td> (5 minutes)</td> </tr> - <tr class="row-even"><td>topology.optimization</td> + <tr class="row-odd"><td>topology.optimization</td> <td>Medium</td> <td colspan="2">A configuration telling Kafka Streams if it should optimize the topology and what optimizations to apply. Acceptable values are: <code>StreamsConfig.NO_OPTIMIZATION</code> (<code>none</code>), <code>StreamsConfig.OPTIMIZE</code> (<code>all</code>) or a comma separated list of specific optimizations: <code>StreamsConfig.REUSE_KTABLE_SOURCE_TOPICS</code> (<code>reuse.ktable.source.topics</code>), <code>StreamsConfig.MERGE_REPARTITION_TOPICS</code> (<code>merge.r [...] <code>StreamsConfig.SINGLE_STORE_SELF_JOIN</code> (<code>single.store.self.join</code>). </td> - <td><code>NO_OPTIMIZATION</code></td> + <td><code class="docutils literal"><span class="pre">"NO_OPTIMIZATION"</span></code></td> </tr> - <tr class="row-odd"><td>upgrade.from</td> + <tr class="row-even"><td>upgrade.from</td> <td>Medium</td> - <td colspan="2">The version you are upgrading from during a rolling upgrade.</td> - <td>See <a class="reference internal" href="#streams-developer-guide-upgrade-from"><span class="std std-ref">Upgrade From</span></a></td> + <td colspan="2">The version you are upgrading from during a rolling upgrade. + See <a class="reference internal" href="#streams-developer-guide-upgrade-from"><span class="std std-ref">Upgrade From</span></a></td> + <td><code class="docutils literal"><span class="pre">null</span></code></td> </tr> - <tr class="row-even"><td>windowstore.changelog.additional.retention.ms</td> + <tr class="row-odd"><td>windowstore.changelog.additional.retention.ms</td> <td>Low</td> <td colspan="2">Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift.</td> - <td>86400000 milliseconds (1 day)</td> + <td><code class="docutils literal"><span class="pre">86400000</span></code></td> (1 day)</td> </tr> - <tr class="row-odd"><td>window.size.ms</td> + <tr class="row-even"><td>window.size.ms</td> <td>Low</td> <td colspan="2">Sets window size for the deserializer in order to calculate window end times.</td> <td><code class="docutils literal"><span class="pre">null</span></code></td> diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 36acc2b2049..11dc049958e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -974,28 +974,6 @@ public class StreamsConfig extends AbstractConfig { DefaultProductionExceptionHandler.class.getName(), Importance.MEDIUM, PRODUCTION_EXCEPTION_HANDLER_CLASS_DOC) - .define(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG, - Type.INT, - null, - Importance.MEDIUM, - RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_DOC) - .define(RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, - Type.STRING, - RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, - in(RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC, RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY), - Importance.MEDIUM, - RACK_AWARE_ASSIGNMENT_STRATEGY_DOC) - .define(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, - Type.LIST, - Collections.emptyList(), - atMostOfSize(MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE), - Importance.MEDIUM, - RACK_AWARE_ASSIGNMENT_TAGS_DOC) - .define(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG, - Type.INT, - null, - Importance.MEDIUM, - RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_DOC) .define(TASK_ASSIGNOR_CLASS_CONFIG, Type.STRING, null, @@ -1058,6 +1036,28 @@ public class StreamsConfig extends AbstractConfig { true, Importance.LOW, ENABLE_METRICS_PUSH_DOC) + .define(RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG, + Type.INT, + null, + Importance.LOW, + RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_DOC) + .define(RACK_AWARE_ASSIGNMENT_STRATEGY_CONFIG, + Type.STRING, + RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, + in(RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC, RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY), + Importance.LOW, + RACK_AWARE_ASSIGNMENT_STRATEGY_DOC) + .define(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, + Type.LIST, + Collections.emptyList(), + atMostOfSize(MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE), + Importance.LOW, + RACK_AWARE_ASSIGNMENT_TAGS_DOC) + .define(RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG, + Type.INT, + null, + Importance.LOW, + RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_DOC) .define(REPARTITION_PURGE_INTERVAL_MS_CONFIG, Type.LONG, DEFAULT_COMMIT_INTERVAL_MS,
