This is an automated email from the ASF dual-hosted git repository. damianguy pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new e1c5d0c MINOR: Add documentation for KAFKA-6086 (ProductionExceptionHandler) (#4395) e1c5d0c is described below commit e1c5d0c119b38a9ddb2b09b6309a3817d86d8e14 Author: Matt Farmer <m...@frmr.me> AuthorDate: Mon Jan 8 06:33:23 2018 -0500 MINOR: Add documentation for KAFKA-6086 (ProductionExceptionHandler) (#4395) * Update streams documentation to describe production exception handler * Add a mention of the ProductionExceptionHandler in the upgrade guide --- docs/streams/developer-guide/config-streams.html | 76 +++++++++++++++++++----- docs/streams/upgrade-guide.html | 9 ++- 2 files changed, 68 insertions(+), 17 deletions(-) diff --git a/docs/streams/developer-guide/config-streams.html b/docs/streams/developer-guide/config-streams.html index dbac7fb..256cc18 100644 --- a/docs/streams/developer-guide/config-streams.html +++ b/docs/streams/developer-guide/config-streams.html @@ -69,6 +69,7 @@ </li> <li><a class="reference internal" href="#optional-configuration-parameters" id="id6">Optional configuration parameters</a><ul> <li><a class="reference internal" href="#default-deserialization-exception-handler" id="id7">default.deserialization.exception.handler</a></li> + <li><a class="reference internal" href="#default-production-exception-handler" id="id24">default.production.exception.handler</a></li> <li><a class="reference internal" href="#default-key-serde" id="id8">default.key.serde</a></li> <li><a class="reference internal" href="#default-value-serde" id="id9">default.value.serde</a></li> <li><a class="reference internal" href="#num-standby-replicas" id="id10">num.standby.replicas</a></li> @@ -216,77 +217,82 @@ <td colspan="2">Exception handling class that implements the <code class="docutils literal"><span class="pre">DeserializationExceptionHandler</span></code> interface.</td> <td>30000 milliseconds</td> </tr> - <tr class="row-even"><td>key.serde</td> + <tr class="row-even"><td>default.production.exception.handler</td> + <td>Medium</td> + <td colspan="2">Exception handling class that implements the <code class="docutils literal"><span class="pre">ProductionExceptionHandler</span></code> interface.</td> + <td><code class="docutils literal"><span class="pre">DefaultProductionExceptionHandler</span></code></td> + </tr> + <tr class="row-odd"><td>key.serde</td> <td>Medium</td> <td colspan="2">Default serializer/deserializer class for record keys, implements the <code class="docutils literal"><span class="pre">Serde</span></code> interface (see also value.serde).</td> <td><code class="docutils literal"><span class="pre">Serdes.ByteArray().getClass().getName()</span></code></td> </tr> - <tr class="row-odd"><td>metric.reporters</td> + <tr class="row-even"><td>metric.reporters</td> <td>Low</td> <td colspan="2">A list of classes to use as metrics reporters.</td> <td>the empty list</td> </tr> - <tr class="row-even"><td>metrics.num.samples</td> + <tr class="row-odd"><td>metrics.num.samples</td> <td>Low</td> <td colspan="2">The number of samples maintained to compute metrics.</td> <td>2</td> </tr> - <tr class="row-odd"><td>metrics.recording.level</td> + <tr class="row-even"><td>metrics.recording.level</td> <td>Low</td> <td colspan="2">The highest recording level for metrics.</td> <td><code class="docutils literal"><span class="pre">INFO</span></code></td> </tr> - <tr class="row-even"><td>metrics.sample.window.ms</td> + <tr class="row-odd"><td>metrics.sample.window.ms</td> <td>Low</td> <td colspan="2">The window of time a metrics sample is computed over.</td> <td>30000 milliseconds</td> </tr> - <tr class="row-odd"><td>num.standby.replicas</td> + <tr class="row-even"><td>num.standby.replicas</td> <td>Medium</td> <td colspan="2">The number of standby replicas for each task.</td> <td>0</td> </tr> - <tr class="row-even"><td>num.stream.threads</td> + <tr class="row-odd"><td>num.stream.threads</td> <td>Medium</td> <td colspan="2">The number of threads to execute stream processing.</td> <td>1</td> </tr> - <tr class="row-odd"><td>partition.grouper</td> + <tr class="row-even"><td>partition.grouper</td> <td>Low</td> <td colspan="2">Partition grouper class that implements the <code class="docutils literal"><span class="pre">PartitionGrouper</span></code> interface.</td> <td>See <a class="reference internal" href="#streams-developer-guide-partition-grouper"><span class="std std-ref">Partition Grouper</span></a></td> </tr> - <tr class="row-even"><td>poll.ms</td> + <tr class="row-odd"><td>poll.ms</td> <td>Low</td> <td colspan="2">The amount of time in milliseconds to block waiting for input.</td> <td>100 milliseconds</td> </tr> - <tr class="row-odd"><td>replication.factor</td> + <tr class="row-even"><td>replication.factor</td> <td>High</td> <td colspan="2">The replication factor for changelog topics and repartition topics created by the application.</td> <td>1</td> </tr> - <tr class="row-even"><td>state.cleanup.delay.ms</td> + <tr class="row-odd"><td>state.cleanup.delay.ms</td> <td>Low</td> <td colspan="2">The amount of time in milliseconds to wait before deleting state when a partition has migrated.</td> <td>6000000 milliseconds</td> </tr> - <tr class="row-odd"><td>state.dir</td> + <tr class="row-even"><td>state.dir</td> <td>High</td> <td colspan="2">Directory location for state stores.</td> <td><code class="docutils literal"><span class="pre">/var/lib/kafka-streams</span></code></td> </tr> - <tr class="row-even"><td>timestamp.extractor</td> + <tr class="row-odd"><td>timestamp.extractor</td> <td>Medium</td> <td colspan="2">Timestamp extractor class that implements the <code class="docutils literal"><span class="pre">TimestampExtractor</span></code> interface.</td> <td>See <a class="reference internal" href="#streams-developer-guide-timestamp-extractor"><span class="std std-ref">Timestamp Extractor</span></a></td> </tr> - <tr class="row-odd"><td>value.serde</td> + <tr class="row-even"><td>value.serde</td> <td>Medium</td> <td colspan="2">Default serializer/deserializer class for record values, implements the <code class="docutils literal"><span class="pre">Serde</span></code> interface (see also key.serde).</td> <td><code class="docutils literal"><span class="pre">Serdes.ByteArray().getClass().getName()</span></code></td> </tr> - <tr class="row-even"><td>windowstore.changelog.additional.retention.ms</td> + <tr class="row-odd"><td>windowstore.changelog.additional.retention.ms</td> <td>Low</td> <td colspan="2">Added to a windows maintainMs to ensure data is not deleted from the log prematurely. Allows for clock drift.</td> <td>86400000 milliseconds = 1 day</td> @@ -309,6 +315,44 @@ </ul> </div></blockquote> </div> + <div class="section" id="default-production-exception-handler"> + <span id="streams-developer-guide-peh"></span><h4><a class="toc-backref" href="#id24">default.production.exception.handler</a></a class="headerlink" href="#default-production-exception-handler" title="Permalink to this headline"></a></h4> + <blockquote> + <div><p>The default production exception handler allows you to manage exceptions triggered when trying to interact with a broker + such as attempting to produce a record that is too large. By default, Kafka provides and uses the <a class="reference external" href="/4.0.0/streams/javadocs/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.html">DefaultProductionExceptionHandler</a> + that always fails when these exceptions occur.</p> + + <p>Each exception handler can return a <code>FAIL</code> or <code>CONTINUE</code> depending on the record and the exception thrown. Returning <code>FAIL</code> will signal that Streams should shut down and <code>CONTINUE</code> will signal that Streams + should ignore the issue and continue processing. If you want to provide an exception handler that always ignores records that are too large, you could implement something like the following:</p> + + <pre class="brush: java;"> + import java.util.Properties; + import org.apache.kafka.streams.StreamsConfig; + import org.apache.kafka.common.errors.RecordTooLargeException; + import org.apache.kafka.streams.errors.ProductionExceptionHandler; + import org.apache.kafka.streams.errors.ProductionExceptionHandler.ProductionExceptionHandlerResponse; + + class IgnoreRecordTooLargeHandler implements ProductionExceptionHandler { + public void configure(Map<String, Object> config) {} + + public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record, + final Exception exception) { + if (exception instanceof RecordTooLargeException) { + return ProductionExceptionHandlerResponse.CONTINUE; + } else { + return ProductionExceptionHandlerResponse.FAIL; + } + } + } + + Properties settings = new Properties(); + + // other various kafka streams settings, e.g. bootstrap servers, application id, etc + + settings.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, + IgnoreRecordTooLargeHandler.class);</pre></div> + </blockquote> + </div> <div class="section" id="default-key-serde"> <h4><a class="toc-backref" href="#id8">default.key.serde</a><a class="headerlink" href="#default-key-serde" title="Permalink to this headline"></a></h4> <blockquote> @@ -714,4 +758,4 @@ // Display docs subnav items $('.b-nav__docs').parent().toggleClass('nav__item__with__subs--expanded'); }); - </script> \ No newline at end of file + </script> diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index a878331..b218ba8 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -70,6 +70,13 @@ <li> added <code>getAdminClient(config)</code> that allows to override an <code>AdminClient</code> used for administrative requests such as internal topic creations, etc. </li> </ul> + <p>New error handling for exceptions during production:</p> + <ul> + <li>added interface <code>ProductionExceptionHandler</code> that allows implementors to decide whether or not Streams should <code>FAIL</code> or <code>CONTINUE</code> when certain exception occur while trying to produce.</li> + <li>provided an implementation, <code>DefaultProductionExceptionHandler</code> that always fails, preserving the existing behavior by default.</li> + <li>changing which implementation is used can be done by settings <code>default.production.exception.handler</code> to the fully qualified name of a class implementing this interface.</li> + </ul> + <h3><a id="streams_api_changes_100" href="#streams_api_changes_100">Streams API changes in 1.0.0</a></h3> <p> @@ -260,7 +267,7 @@ If exactly-once processing is enabled via the <code>processing.guarantees</code> parameter, internally Streams switches from a producer per thread to a producer per task runtime model. In order to distinguish the different producers, the producer's <code>client.id</code> additionally encodes the task-ID for this case. Because the producer's <code>client.id</code> is used to report JMX metrics, it might be required to update tools that receive those metrics. - + </p> <p> Producer's <code>client.id</code> naming schema: </p> -- To stop receiving notification emails like this one, please contact ['"commits@kafka.apache.org" <commits@kafka.apache.org>'].