This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4189a36b41a MINOR: fix JavaDocs of Kafka Streams context classes 
(#17049)
4189a36b41a is described below

commit 4189a36b41a0160f6015701dcc2105d4797ecd04
Author: Matthias J. Sax <[email protected]>
AuthorDate: Sat Aug 31 21:46:39 2024 -0700

    MINOR: fix JavaDocs of Kafka Streams context classes (#17049)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../org/apache/kafka/streams/processor/ProcessorContext.java     | 9 +++++++--
 .../apache/kafka/streams/processor/api/ProcessingContext.java    | 8 ++++++--
 .../org/apache/kafka/streams/processor/api/ProcessorContext.java | 4 ++--
 3 files changed, 15 insertions(+), 6 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index f438a88cd6d..fd99985e6f8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -17,12 +17,16 @@
 package org.apache.kafka.streams.processor;
 
 import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.state.StoreBuilder;
 
 import java.io.File;
 import java.time.Duration;
@@ -292,8 +296,9 @@ public interface ProcessorContext {
      * (including the currently processed record), i.e., it can be considered 
a high-watermark.
      * Stream-time is tracked on a per-task basis and is preserved across 
restarts and during task migration.
      *
-     * <p> Note: this method is not supported for global processors (cf. 
{@link Topology#addGlobalStore} (...)
-     * and {@link StreamsBuilder#addGlobalStore} (...),
+     * <p> Note: this method is not supported for global processors (cf.
+     * {@link Topology#addGlobalStore(StoreBuilder, String, Deserializer, 
Deserializer, String, String, ProcessorSupplier)  Topology.addGlobalStore(...)}
+     * and {@link StreamsBuilder#addGlobalStore(StoreBuilder, String, 
Consumed, ProcessorSupplier) StreamsBuilder.addGlobalStore(...)}),
      * because there is no concept of stream-time for this case.
      * Calling this method in a global processor will result in an {@link 
UnsupportedOperationException}.
      *
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessingContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessingContext.java
index 26e1f356419..78aa2c8489b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessingContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessingContext.java
@@ -16,16 +16,19 @@
  */
 package org.apache.kafka.streams.processor.api;
 
+import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsMetrics;
 import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.state.StoreBuilder;
 
 import java.io.File;
 import java.time.Duration;
@@ -201,8 +204,9 @@ public interface ProcessingContext {
      * (including the currently processed record), i.e., it can be considered 
a high-watermark.
      * Stream-time is tracked on a per-task basis and is preserved across 
restarts and during task migration.
      *
-     * <p> Note: this method is not supported for global processors (cf. 
{@link Topology#addGlobalStore} (...)
-     * and {@link StreamsBuilder#addGlobalStore} (...),
+     * <p> Note: this method is not supported for global processors (cf.
+     * {@link Topology#addGlobalStore(StoreBuilder, String, 
TimestampExtractor, Deserializer, Deserializer, String, String, 
ProcessorSupplier) Topology#addGlobalStore(...)}
+     * and {@link StreamsBuilder#addGlobalStore(StoreBuilder, String, 
Consumed, ProcessorSupplier) StreamsBuilder.addGlobalStore(...)}),
      * because there is no concept of stream-time for this case.
      * Calling this method in a global processor will result in an {@link 
UnsupportedOperationException}.
      *
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
index 4ce993c8887..4e5e0b1a756 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
@@ -59,7 +59,7 @@ public interface ProcessorContext<KForward, VForward> extends 
ProcessingContext
      * of causality and could lead to undefined behavior.
      * <p>
      * A safe usage would look like this:
-     * <code>
+     * <pre>{@code
      *     process(Record inputRecord) {
      *         // makes a copy of the headers
      *         Record toForward = 
inputRecord.withHeaders(inputRecord.headers());
@@ -76,7 +76,7 @@ public interface ProcessorContext<KForward, VForward> extends 
ProcessingContext
      *         toForward.headers().add(...);
      *         context.forward(toForward);
      *     }
-     * </code>
+     * }</pre>
      * @param record The record to forward to all children
      */
     <K extends KForward, V extends VForward> void forward(Record<K, V> record);

Reply via email to