This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e9154b7 KAFKA-6905: Document that Processors may be re-used by
Streams (#5022)
e9154b7 is described below
commit e9154b7960fd9fe9bf05811e4e9972698eeec355
Author: David Glasser <[email protected]>
AuthorDate: Wed May 16 13:10:21 2018 -0700
KAFKA-6905: Document that Processors may be re-used by Streams (#5022)
Reviewers: Guozhang Wang <[email protected]>
---
docs/streams/developer-guide/processor-api.html | 6 +++++-
.../src/main/java/org/apache/kafka/streams/processor/Processor.java | 6 ++++--
2 files changed, 9 insertions(+), 3 deletions(-)
diff --git a/docs/streams/developer-guide/processor-api.html
b/docs/streams/developer-guide/processor-api.html
index e3432b7..ef89372 100644
--- a/docs/streams/developer-guide/processor-api.html
+++ b/docs/streams/developer-guide/processor-api.html
@@ -79,7 +79,11 @@
instance, which provides access to the metadata of the
currently processed record, including its source Kafka topic and partition,
its corresponding message offset, and further such
information. You can also use this context instance to schedule a punctuation
function (via <code class="docutils literal"><span
class="pre">ProcessorContext#schedule()</span></code>), to forward a new record
as a key-value pair to the downstream processors (via <code class="docutils
literal"><span class="pre">ProcessorContext#forward()</span></code>),
- and to commit the current processing progress (via <code
class="docutils literal"><span
class="pre">ProcessorContext#commit()</span></code>).</p>
+ and to commit the current processing progress (via <code
class="docutils literal"><span
class="pre">ProcessorContext#commit()</span></code>).
+ Any resources you set up in <code class="docutils literal"><span
class="pre">init()</span></code> can be cleaned up in the
+ <code class="docutils literal"><span
class="pre">close()</span></code> method. Note that Kafka Streams may re-use a
single
+ <code class="docutils literal"><span
class="pre">Processor</span></code> object by calling
+ <code class="docutils literal"><span
class="pre">init()</span></code> on it again after <code class="docutils
literal"><span class="pre">close()</span></code>.</p>
<p>When records are forwarded via downstream processors they also
get a timestamp assigned. There are two different default behaviors:
(1) If <code class="docutils literal"><span
class="pre">#forward()</span></code> is called within <code class="docutils
literal"><span class="pre">#process()</span></code> the output record inherits
the input record timestamp.
(2) If <code class="docutils literal"><span
class="pre">#forward()</span></code> is called within <code class="docutils
literal"><span class="pre">punctuate()</span></code></p> the output record
inherits the current punctuation timestamp (either current 'stream time' or
system wall-clock time).
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
index bcdb2f0..e35337f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/Processor.java
@@ -29,7 +29,8 @@ public interface Processor<K, V> {
/**
* Initialize this processor with the given context. The framework ensures
this is called once per processor when the topology
- * that contains it is initialized.
+ * that contains it is initialized. When the framework is done with the
processor, {@link #close()} will be called on it; the
+ * framework may later re-use the processor by calling {@link #init()}
again.
* <p>
* The provided {@link ProcessorContext context} can be used to access
topology and record meta data, to
* {@link ProcessorContext#schedule(long, PunctuationType, Punctuator)
schedule} a method to be
@@ -49,7 +50,8 @@ public interface Processor<K, V> {
/**
* Close this processor and clean up any resources. Be aware that {@link
#close()} is called after an internal cleanup.
- * Thus, it is not possible to write anything to Kafka as underlying
clients are already closed.
+ * Thus, it is not possible to write anything to Kafka as underlying
clients are already closed. The framework may
+ * later re-use this processor by calling {@link #init()} on it again.
* <p>
* Note: Do not close any streams managed resources, like {@link
StateStore}s here, as they are managed by the library.
*/
--
To stop receiving notification emails like this one, please contact
[email protected].