This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4189a36b41a MINOR: fix JavaDocs of Kafka Streams context classes
(#17049)
4189a36b41a is described below
commit 4189a36b41a0160f6015701dcc2105d4797ecd04
Author: Matthias J. Sax <[email protected]>
AuthorDate: Sat Aug 31 21:46:39 2024 -0700
MINOR: fix JavaDocs of Kafka Streams context classes (#17049)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../org/apache/kafka/streams/processor/ProcessorContext.java | 9 +++++++--
.../apache/kafka/streams/processor/api/ProcessingContext.java | 8 ++++++--
.../org/apache/kafka/streams/processor/api/ProcessorContext.java | 4 ++--
3 files changed, 15 insertions(+), 6 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
index f438a88cd6d..fd99985e6f8 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
@@ -17,12 +17,16 @@
package org.apache.kafka.streams.processor;
import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.state.StoreBuilder;
import java.io.File;
import java.time.Duration;
@@ -292,8 +296,9 @@ public interface ProcessorContext {
* (including the currently processed record), i.e., it can be considered
a high-watermark.
* Stream-time is tracked on a per-task basis and is preserved across
restarts and during task migration.
*
- * <p> Note: this method is not supported for global processors (cf.
{@link Topology#addGlobalStore} (...)
- * and {@link StreamsBuilder#addGlobalStore} (...),
+ * <p> Note: this method is not supported for global processors (cf.
+ * {@link Topology#addGlobalStore(StoreBuilder, String, Deserializer,
Deserializer, String, String, ProcessorSupplier) Topology.addGlobalStore(...)}
+ * and {@link StreamsBuilder#addGlobalStore(StoreBuilder, String,
Consumed, ProcessorSupplier) StreamsBuilder.addGlobalStore(...)}),
* because there is no concept of stream-time for this case.
* Calling this method in a global processor will result in an {@link
UnsupportedOperationException}.
*
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessingContext.java
b/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessingContext.java
index 26e1f356419..78aa2c8489b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessingContext.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessingContext.java
@@ -16,16 +16,19 @@
*/
package org.apache.kafka.streams.processor.api;
+import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.state.StoreBuilder;
import java.io.File;
import java.time.Duration;
@@ -201,8 +204,9 @@ public interface ProcessingContext {
* (including the currently processed record), i.e., it can be considered
a high-watermark.
* Stream-time is tracked on a per-task basis and is preserved across
restarts and during task migration.
*
- * <p> Note: this method is not supported for global processors (cf.
{@link Topology#addGlobalStore} (...)
- * and {@link StreamsBuilder#addGlobalStore} (...),
+ * <p> Note: this method is not supported for global processors (cf.
+ * {@link Topology#addGlobalStore(StoreBuilder, String,
TimestampExtractor, Deserializer, Deserializer, String, String,
ProcessorSupplier) Topology#addGlobalStore(...)}
+ * and {@link StreamsBuilder#addGlobalStore(StoreBuilder, String,
Consumed, ProcessorSupplier) StreamsBuilder.addGlobalStore(...)}),
* because there is no concept of stream-time for this case.
* Calling this method in a global processor will result in an {@link
UnsupportedOperationException}.
*
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
b/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
index 4ce993c8887..4e5e0b1a756 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
@@ -59,7 +59,7 @@ public interface ProcessorContext<KForward, VForward> extends
ProcessingContext
* of causality and could lead to undefined behavior.
* <p>
* A safe usage would look like this:
- * <code>
+ * <pre>{@code
* process(Record inputRecord) {
* // makes a copy of the headers
* Record toForward =
inputRecord.withHeaders(inputRecord.headers());
@@ -76,7 +76,7 @@ public interface ProcessorContext<KForward, VForward> extends
ProcessingContext
* toForward.headers().add(...);
* context.forward(toForward);
* }
- * </code>
+ * }</pre>
* @param record The record to forward to all children
*/
<K extends KForward, V extends VForward> void forward(Record<K, V> record);