This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e9154b7  KAFKA-6905: Document that Processors may be re-used by 
Streams (#5022)
e9154b7 is described below

commit e9154b7960fd9fe9bf05811e4e9972698eeec355
Author: David Glasser <[email protected]>
AuthorDate: Wed May 16 13:10:21 2018 -0700

    KAFKA-6905: Document that Processors may be re-used by Streams (#5022)
    
    Reviewers: Guozhang Wang <[email protected]>
---
 docs/streams/developer-guide/processor-api.html                     | 6 +++++-
 .../src/main/java/org/apache/kafka/streams/processor/Processor.java | 6 ++++--
 2 files changed, 9 insertions(+), 3 deletions(-)

diff --git a/docs/streams/developer-guide/processor-api.html 
b/docs/streams/developer-guide/processor-api.html
index e3432b7..ef89372 100644
--- a/docs/streams/developer-guide/processor-api.html
+++ b/docs/streams/developer-guide/processor-api.html
@@ -79,7 +79,11 @@
                 instance, which provides access to the metadata of the 
currently processed record, including its source Kafka topic and partition,
                 its corresponding message offset, and further such 
information. You can also use this context instance to schedule a punctuation
                 function (via <code class="docutils literal"><span 
class="pre">ProcessorContext#schedule()</span></code>), to forward a new record 
as a key-value pair to the downstream processors (via <code class="docutils 
literal"><span class="pre">ProcessorContext#forward()</span></code>),
-                and to commit the current processing progress (via <code 
class="docutils literal"><span 
class="pre">ProcessorContext#commit()</span></code>).</p>
+              and to commit the current processing progress (via <code 
class="docutils literal"><span 
class="pre">ProcessorContext#commit()</span></code>).
+              Any resources you set up in <code class="docutils literal"><span 
class="pre">init()</span></code> can be cleaned up in the
+              <code class="docutils literal"><span 
class="pre">close()</span></code> method. Note that Kafka Streams may re-use a 
single
+              <code class="docutils literal"><span 
class="pre">Processor</span></code> object by calling
+              <code class="docutils literal"><span 
class="pre">init()</span></code> on it again after <code class="docutils 
literal"><span class="pre">close()</span></code>.</p>
             <p>When records are forwarded via downstream processors they also 
get a timestamp assigned. There are two different default behaviors:
               (1) If <code class="docutils literal"><span 
class="pre">#forward()</span></code> is called within <code class="docutils 
literal"><span class="pre">#process()</span></code> the output record inherits 
the input record timestamp.
               (2) If <code class="docutils literal"><span 
class="pre">#forward()</span></code> is called within <code class="docutils 
literal"><span class="pre">punctuate()</span></code></p> the output record 
inherits the current punctuation timestamp (either current 'stream time' or 
system wall-clock time).
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java 
b/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
index bcdb2f0..e35337f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
@@ -29,7 +29,8 @@ public interface Processor<K, V> {
 
     /**
      * Initialize this processor with the given context. The framework ensures 
this is called once per processor when the topology
-     * that contains it is initialized.
+     * that contains it is initialized. When the framework is done with the 
processor, {@link #close()} will be called on it; the
+     * framework may later re-use the processor by calling {@link #init()} 
again.
      * <p>
      * The provided {@link ProcessorContext context} can be used to access 
topology and record meta data, to
      * {@link ProcessorContext#schedule(long, PunctuationType, Punctuator) 
schedule} a method to be
@@ -49,7 +50,8 @@ public interface Processor<K, V> {
 
     /**
      * Close this processor and clean up any resources. Be aware that {@link 
#close()} is called after an internal cleanup.
-     * Thus, it is not possible to write anything to Kafka as underlying 
clients are already closed.
+     * Thus, it is not possible to write anything to Kafka as underlying 
clients are already closed. The framework may
+     * later re-use this processor by calling {@link #init()} on it again.
      * <p>
      * Note: Do not close any streams managed resources, like {@link 
StateStore}s here, as they are managed by the library.
      */

-- 
To stop receiving notification emails like this one, please contact
[email protected].

Reply via email to