This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 240efbb99d9 MINOR: improve JavaDocs for Kafka Streams exceptions and 
error handlers (#17856)
240efbb99d9 is described below

commit 240efbb99d9ee489251db0a8110936ad9eb0eb48
Author: Matthias J. Sax <[email protected]>
AuthorDate: Thu Nov 21 11:46:23 2024 -0800

    MINOR: improve JavaDocs for Kafka Streams exceptions and error handlers 
(#17856)
    
    Reviewers: Bill Bejeck <[email protected]>
---
 .../streams/errors/BrokerNotFoundException.java    |  2 +-
 .../errors/DefaultProductionExceptionHandler.java  |  1 +
 .../errors/DeserializationExceptionHandler.java    | 39 +++++++++++------
 .../kafka/streams/errors/ErrorHandlerContext.java  | 19 ++++-----
 .../streams/errors/InvalidStateStoreException.java |  3 +-
 .../InvalidStateStorePartitionException.java       |  4 +-
 .../apache/kafka/streams/errors/LockException.java |  1 -
 .../errors/LogAndContinueExceptionHandler.java     | 26 ++++++++----
 .../LogAndContinueProcessingExceptionHandler.java  | 13 ++++--
 .../streams/errors/LogAndFailExceptionHandler.java | 28 ++++++++-----
 .../LogAndFailProcessingExceptionHandler.java      | 13 ++++--
 .../streams/errors/ProcessingExceptionHandler.java | 19 +++++----
 .../streams/errors/ProcessorStateException.java    |  1 -
 .../streams/errors/ProductionExceptionHandler.java | 49 ++++++++++++++++------
 .../errors/StateStoreMigratedException.java        |  1 +
 .../errors/StateStoreNotAvailableException.java    |  1 +
 .../kafka/streams/errors/StreamsException.java     |  6 +--
 .../streams/errors/StreamsNotStartedException.java |  1 +
 .../errors/StreamsRebalancingException.java        |  1 +
 .../streams/errors/StreamsStoppedException.java    |  1 +
 .../errors/StreamsUncaughtExceptionHandler.java    | 17 ++++++--
 .../streams/errors/TaskCorruptedException.java     |  4 +-
 .../streams/errors/TaskIdFormatException.java      |  1 +
 .../kafka/streams/errors/TopologyException.java    |  1 -
 .../streams/errors/UnknownStateStoreException.java |  1 +
 .../streams/errors/UnknownTopologyException.java   |  1 +
 26 files changed, 170 insertions(+), 84 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/BrokerNotFoundException.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/BrokerNotFoundException.java
index d2df27f8d34..bab44a65357 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/BrokerNotFoundException.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/BrokerNotFoundException.java
@@ -16,13 +16,13 @@
  */
 package org.apache.kafka.streams.errors;
 
-
 /**
  * Indicates that none of the specified {@link 
org.apache.kafka.streams.StreamsConfig#BOOTSTRAP_SERVERS_CONFIG brokers}
  * could be found.
  *
  * @see org.apache.kafka.streams.StreamsConfig
  */
+@SuppressWarnings("unused")
 public class BrokerNotFoundException extends StreamsException {
 
     private static final long serialVersionUID = 1L;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java
index d6cc8e915e7..8b1562861c4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java
@@ -26,6 +26,7 @@ import java.util.Map;
  * happens while attempting to produce result records.
  */
 public class DefaultProductionExceptionHandler implements 
ProductionExceptionHandler {
+    @SuppressWarnings("deprecation")
     @Deprecated
     @Override
     public ProductionExceptionHandlerResponse handle(final 
ProducerRecord<byte[], byte[]> record,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java
index 198a97cce44..0b44e04d791 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java
@@ -29,14 +29,20 @@ public interface DeserializationExceptionHandler extends 
Configurable {
 
     /**
      * Inspect a record and the exception received.
-     * <p>
-     * Note, that the passed in {@link ProcessorContext} only allows to access 
metadata like the task ID.
+     *
+     * <p> Note, that the passed in {@link ProcessorContext} only allows to 
access metadata like the task ID.
      * However, it cannot be used to emit records via {@link 
ProcessorContext#forward(Object, Object)};
      * calling {@code forward()} (and some other methods) would result in a 
runtime exception.
      *
-     * @param context processor context
-     * @param record record that failed deserialization
-     * @param exception the actual exception
+     * @param context
+     *     Processor context.
+     * @param record
+     *     Record that failed deserialization.
+     * @param exception
+     *     The actual exception.
+     *
+     * @return Whether to continue or stop processing.
+     *
      * @deprecated Since 3.9. Use {@link #handle(ErrorHandlerContext, 
ConsumerRecord, Exception)} instead.
      */
     @Deprecated
@@ -49,9 +55,14 @@ public interface DeserializationExceptionHandler extends 
Configurable {
     /**
      * Inspect a record and the exception received.
      *
-     * @param context error handler context
-     * @param record record that failed deserialization
-     * @param exception the actual exception
+     * @param context
+     *     Error handler context.
+     * @param record
+     *     Record that failed deserialization.
+     * @param exception
+     *     The actual exception.
+     *
+     * @return Whether to continue or stop processing.
      */
     default DeserializationHandlerResponse handle(final ErrorHandlerContext 
context,
                                                   final ConsumerRecord<byte[], 
byte[]> record,
@@ -63,15 +74,19 @@ public interface DeserializationExceptionHandler extends 
Configurable {
      * Enumeration that describes the response from the exception handler.
      */
     enum DeserializationHandlerResponse {
-        /* continue with processing */
+        /** Continue processing. */
         CONTINUE(0, "CONTINUE"),
-        /* fail the processing and stop */
+        /** Fail processing. */
         FAIL(1, "FAIL");
 
-        /** an english description of the api--this is for debugging and can 
change */
+        /**
+         * An english description for the used option. This is for debugging 
only and may change.
+         */
         public final String name;
 
-        /** the permanent and immutable id of an API--this can't change ever */
+        /**
+         * The permanent and immutable id for the used option. This can't 
change ever.
+         */
         public final int id;
 
         DeserializationHandlerResponse(final int id, final String name) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java
index 82d32581255..d471673a48e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java
@@ -26,7 +26,6 @@ import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
 
-
 /**
  * This interface allows user code to inspect the context of a record that has 
failed during processing.
  *
@@ -48,7 +47,7 @@ public interface ErrorHandlerContext {
      * Additionally, when writing into a changelog topic, there is no 
associated input record,
      * and thus no topic name is available.
      *
-     * @return the topic name
+     * @return The topic name.
      */
     String topic();
 
@@ -66,7 +65,7 @@ public interface ErrorHandlerContext {
      * Additionally, when writing into a changelog topic, there is no 
associated input record,
      * and thus no partition is available.
      *
-     * @return the partition ID
+     * @return The partition ID.
      */
     int partition();
 
@@ -84,7 +83,7 @@ public interface ErrorHandlerContext {
      * Additionally, when writing into a changelog topic, there is no 
associated input record,
      * and thus no offset is available.
      *
-     * @return the offset
+     * @return The offset.
      */
     long offset();
 
@@ -102,21 +101,21 @@ public interface ErrorHandlerContext {
      * Additionally, when writing into a changelog topic, there is no 
associated input record,
      * and thus no headers are available.
      *
-     * @return the headers
+     * @return The headers.
      */
     Headers headers();
 
     /**
      * Return the current processor node ID.
      *
-     * @return the processor node ID
+     * @return The processor node ID.
      */
     String processorNodeId();
 
     /**
      * Return the task ID.
      *
-     * @return the task ID
+     * @return The task ID.
      */
     TaskId taskId();
 
@@ -138,14 +137,14 @@ public interface ErrorHandlerContext {
      * if this method is invoked from the punctuate call):
      * <ul>
      *   <li>In case of {@link PunctuationType#STREAM_TIME} timestamp is 
defined as the current task's stream time,
-     *   which is defined as the largest timestamp of any record processed by 
the task
-     *   <li>In case of {@link PunctuationType#WALL_CLOCK_TIME} timestamp is 
defined the current system time
+     *   which is defined as the largest timestamp of any record processed by 
the task</li>
+     *   <li>In case of {@link PunctuationType#WALL_CLOCK_TIME} timestamp is 
defined the current system time</li>
      * </ul>
      *
      * <p> If it is triggered from a deserialization failure, timestamp is 
defined as the timestamp of the
      * current rawRecord {@link 
org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord}.
      *
-     * @return the timestamp
+     * @return The timestamp.
      */
     long timestamp();
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/InvalidStateStoreException.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/InvalidStateStoreException.java
index 6ad06a54096..f5fbef72547 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/InvalidStateStoreException.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/InvalidStateStoreException.java
@@ -16,10 +16,9 @@
  */
 package org.apache.kafka.streams.errors;
 
-
 /**
  * Indicates that there was a problem when trying to access a {@link 
org.apache.kafka.streams.processor.StateStore StateStore}.
- * {@code InvalidStateStoreException} is not thrown directly but only its 
following sub-classes.
+ * {@code InvalidStateStoreException} is not thrown directly but only its 
following subclasses.
  */
 public class InvalidStateStoreException extends StreamsException {
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/InvalidStateStorePartitionException.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/InvalidStateStorePartitionException.java
index e85a0375c42..f27926bfc65 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/InvalidStateStorePartitionException.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/InvalidStateStorePartitionException.java
@@ -21,7 +21,8 @@ import org.apache.kafka.streams.KafkaStreams;
 /**
  * Indicates that the specific state store being queried via
  * {@link org.apache.kafka.streams.StoreQueryParameters} used a partitioning 
that is not assigned to this instance.
- * You can use {@link KafkaStreams#metadataForAllStreamsClients()} to discover 
the correct instance that hosts the requested partition.
+ * You can use {@link KafkaStreams#metadataForAllStreamsClients()} to discover 
the correct instance
+ * that hosts the requested partition.
  */
 public class InvalidStateStorePartitionException extends 
InvalidStateStoreException {
 
@@ -31,6 +32,7 @@ public class InvalidStateStorePartitionException extends 
InvalidStateStoreExcept
         super(message);
     }
 
+    @SuppressWarnings("unused")
     public InvalidStateStorePartitionException(final String message, final 
Throwable throwable) {
         super(message, throwable);
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/LockException.java 
b/streams/src/main/java/org/apache/kafka/streams/errors/LockException.java
index 80bb592b4d4..a16171f5ee4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/errors/LockException.java
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/LockException.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.errors;
 
-
 /**
  * Indicates that the state store directory lock could not be acquired because 
another thread holds the lock.
  *
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java
index a93b7c99517..3622da137c1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.errors;
 
-
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.streams.processor.ProcessorContext;
 
@@ -32,16 +31,21 @@ import java.util.Map;
 public class LogAndContinueExceptionHandler implements 
DeserializationExceptionHandler {
     private static final Logger log = 
LoggerFactory.getLogger(LogAndContinueExceptionHandler.class);
 
+    @SuppressWarnings("deprecation")
     @Deprecated
     @Override
     public DeserializationHandlerResponse handle(final ProcessorContext 
context,
                                                  final ConsumerRecord<byte[], 
byte[]> record,
                                                  final Exception exception) {
 
-        log.warn("Exception caught during Deserialization, " +
-                 "taskId: {}, topic: {}, partition: {}, offset: {}",
-                 context.taskId(), record.topic(), record.partition(), 
record.offset(),
-                 exception);
+        log.warn(
+            "Exception caught during Deserialization, taskId: {}, topic: {}, 
partition: {}, offset: {}",
+            context.taskId(),
+            record.topic(),
+            record.partition(),
+            record.offset(),
+            exception
+        );
 
         return DeserializationHandlerResponse.CONTINUE;
     }
@@ -51,10 +55,14 @@ public class LogAndContinueExceptionHandler implements 
DeserializationExceptionH
                                                  final ConsumerRecord<byte[], 
byte[]> record,
                                                  final Exception exception) {
 
-        log.warn("Exception caught during Deserialization, " +
-                 "taskId: {}, topic: {}, partition: {}, offset: {}",
-                 context.taskId(), record.topic(), record.partition(), 
record.offset(),
-                 exception);
+        log.warn(
+            "Exception caught during Deserialization, taskId: {}, topic: {}, 
partition: {}, offset: {}",
+            context.taskId(),
+            record.topic(),
+            record.partition(),
+            record.offset(),
+            exception
+        );
 
         return DeserializationHandlerResponse.CONTINUE;
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueProcessingExceptionHandler.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueProcessingExceptionHandler.java
index 113510d5889..c832ab14200 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueProcessingExceptionHandler.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueProcessingExceptionHandler.java
@@ -32,10 +32,15 @@ public class LogAndContinueProcessingExceptionHandler 
implements ProcessingExcep
 
     @Override
     public ProcessingHandlerResponse handle(final ErrorHandlerContext context, 
final Record<?, ?> record, final Exception exception) {
-        log.warn("Exception caught during message processing, " +
-                "processor node: {}, taskId: {}, source topic: {}, source 
partition: {}, source offset: {}",
-            context.processorNodeId(), context.taskId(), context.topic(), 
context.partition(), context.offset(),
-            exception);
+        log.warn(
+            "Exception caught during message processing, processor node: {}, 
taskId: {}, source topic: {}, source partition: {}, source offset: {}",
+            context.processorNodeId(),
+            context.taskId(),
+            context.topic(),
+            context.partition(),
+            context.offset(),
+            exception
+        );
 
         return ProcessingHandlerResponse.CONTINUE;
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java
index 5fdda623bdd..aaef5ca050b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java
@@ -24,7 +24,6 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
-
 /**
  * Deserialization handler that logs a deserialization exception and then
  * signals the processing pipeline to stop processing more records and fail.
@@ -32,16 +31,21 @@ import java.util.Map;
 public class LogAndFailExceptionHandler implements 
DeserializationExceptionHandler {
     private static final Logger log = 
LoggerFactory.getLogger(LogAndFailExceptionHandler.class);
 
-    @Override
+    @SuppressWarnings("deprecation")
     @Deprecated
+    @Override
     public DeserializationHandlerResponse handle(final ProcessorContext 
context,
                                                  final ConsumerRecord<byte[], 
byte[]> record,
                                                  final Exception exception) {
 
-        log.error("Exception caught during Deserialization, " +
-                  "taskId: {}, topic: {}, partition: {}, offset: {}",
-                  context.taskId(), record.topic(), record.partition(), 
record.offset(),
-                  exception);
+        log.error(
+            "Exception caught during Deserialization, taskId: {}, topic: {}, 
partition: {}, offset: {}",
+            context.taskId(),
+            record.topic(),
+            record.partition(),
+            record.offset(),
+            exception
+        );
 
         return DeserializationHandlerResponse.FAIL;
     }
@@ -51,10 +55,14 @@ public class LogAndFailExceptionHandler implements 
DeserializationExceptionHandl
                                                  final ConsumerRecord<byte[], 
byte[]> record,
                                                  final Exception exception) {
 
-        log.error("Exception caught during Deserialization, " +
-                  "taskId: {}, topic: {}, partition: {}, offset: {}",
-                  context.taskId(), record.topic(), record.partition(), 
record.offset(),
-                  exception);
+        log.error(
+            "Exception caught during Deserialization, taskId: {}, topic: {}, 
partition: {}, offset: {}",
+            context.taskId(),
+            record.topic(),
+            record.partition(),
+            record.offset(),
+            exception
+        );
 
         return DeserializationHandlerResponse.FAIL;
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProcessingExceptionHandler.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProcessingExceptionHandler.java
index 9c2cf91c605..f592663a6c0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProcessingExceptionHandler.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProcessingExceptionHandler.java
@@ -32,10 +32,15 @@ public class LogAndFailProcessingExceptionHandler 
implements ProcessingException
 
     @Override
     public ProcessingHandlerResponse handle(final ErrorHandlerContext context, 
final Record<?, ?> record, final Exception exception) {
-        log.error("Exception caught during message processing, " +
-                "processor node: {}, taskId: {}, source topic: {}, source 
partition: {}, source offset: {}",
-            context.processorNodeId(), context.taskId(), context.topic(), 
context.partition(), context.offset(),
-            exception);
+        log.error(
+            "Exception caught during message processing, processor node: {}, 
taskId: {}, source topic: {}, source partition: {}, source offset: {}",
+            context.processorNodeId(),
+            context.taskId(),
+            context.topic(),
+            context.partition(),
+            context.offset(),
+            exception
+        );
 
         return ProcessingHandlerResponse.FAIL;
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/ProcessingExceptionHandler.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/ProcessingExceptionHandler.java
index 33b2596be12..7dc1b90bc2e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/ProcessingExceptionHandler.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/ProcessingExceptionHandler.java
@@ -26,25 +26,30 @@ public interface ProcessingExceptionHandler extends 
Configurable {
     /**
      * Inspect a record and the exception received
      *
-     * @param context processing context metadata
-     * @param record record where the exception occurred
-     * @param exception the actual exception
+     * @param context
+     *     Processing context metadata.
+     * @param record
+     *     Record where the exception occurred.
+     * @param exception
+     *     The actual exception.
+     *
+     * @return Whether to continue or stop processing.
      */
     ProcessingHandlerResponse handle(final ErrorHandlerContext context, final 
Record<?, ?> record, final Exception exception);
 
     enum ProcessingHandlerResponse {
-        /* continue with processing */
+        /** Continue processing. */
         CONTINUE(1, "CONTINUE"),
-        /* fail the processing and stop */
+        /** Fail processing. */
         FAIL(2, "FAIL");
 
         /**
-         * the permanent and immutable name of processing exception response
+         * An english description for the used option. This is for debugging 
only and may change.
          */
         public final String name;
 
         /**
-         * the permanent and immutable id of processing exception response
+         * The permanent and immutable id for the used option. This can't 
change ever.
          */
         public final int id;
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/ProcessorStateException.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/ProcessorStateException.java
index 44636043786..1e40b79fd86 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/ProcessorStateException.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/ProcessorStateException.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.errors;
 
-
 /**
  * Indicates a processor state operation (e.g. put, get) has failed.
  *
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java
index 95127887b36..ed6b38a5692 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java
@@ -26,10 +26,15 @@ import org.apache.kafka.common.Configurable;
 public interface ProductionExceptionHandler extends Configurable {
     /**
      * Inspect a record that we attempted to produce, and the exception that 
resulted
-     * from attempting to produce it and determine whether or not to continue 
processing.
+     * from attempting to produce it and determine to continue or stop 
processing.
+     *
+     * @param record
+     *     The record that failed to produce.
+     * @param exception
+     *     The exception that occurred during production.
+     *
+     * @return Whether to continue or stop processing, or retry the failed 
operation.
      *
-     * @param record The record that failed to produce
-     * @param exception The exception that occurred during production
      * @deprecated Since 3.9. Use {@link #handle(ErrorHandlerContext, 
ProducerRecord, Exception)} instead.
      */
     @Deprecated
@@ -40,11 +45,16 @@ public interface ProductionExceptionHandler extends 
Configurable {
 
     /**
      * Inspect a record that we attempted to produce, and the exception that 
resulted
-     * from attempting to produce it and determine whether or not to continue 
processing.
+     * from attempting to produce it and determine to continue or stop 
processing.
+     *
+     * @param context
+     *     The error handler context metadata.
+     * @param record
+     *     The record that failed to produce.
+     * @param exception
+     *     The exception that occurred during production.
      *
-     * @param context The error handler context metadata
-     * @param record The record that failed to produce
-     * @param exception The exception that occurred during production
+     * @return Whether to continue or stop processing, or retry the failed 
operation.
      */
     default ProductionExceptionHandlerResponse handle(final 
ErrorHandlerContext context,
                                                       final 
ProducerRecord<byte[], byte[]> record,
@@ -56,10 +66,16 @@ public interface ProductionExceptionHandler extends 
Configurable {
      * Handles serialization exception and determine if the process should 
continue. The default implementation is to
      * fail the process.
      *
-     * @param record        the record that failed to serialize
-     * @param exception     the exception that occurred during serialization
+     * @param record
+     *     The record that failed to serialize.
+     * @param exception
+     *     The exception that occurred during serialization.
+     *
+     * @return Whether to continue or stop processing, or retry the failed 
operation.
+     *
      * @deprecated Since 3.9. Use {@link 
#handleSerializationException(ErrorHandlerContext, ProducerRecord, Exception, 
SerializationExceptionOrigin)} instead.
      */
+    @SuppressWarnings({"rawtypes", "unused"})
     @Deprecated
     default ProductionExceptionHandlerResponse 
handleSerializationException(final ProducerRecord record,
                                                                             
final Exception exception) {
@@ -70,11 +86,18 @@ public interface ProductionExceptionHandler extends 
Configurable {
      * Handles serialization exception and determine if the process should 
continue. The default implementation is to
      * fail the process.
      *
-     * @param context   the error handler context metadata
-     * @param record    the record that failed to serialize
-     * @param exception the exception that occurred during serialization
-     * @param origin    the origin of the serialization exception
+     * @param context
+     *     The error handler context metadata.
+     * @param record
+     *     The record that failed to serialize.
+     * @param exception
+     *     The exception that occurred during serialization.
+     * @param origin
+     *     The origin of the serialization exception.
+     *
+     * @return Whether to continue or stop processing, or retry the failed 
operation.
      */
+    @SuppressWarnings("rawtypes")
     default ProductionExceptionHandlerResponse 
handleSerializationException(final ErrorHandlerContext context,
                                                                             
final ProducerRecord record,
                                                                             
final Exception exception,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/StateStoreMigratedException.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/StateStoreMigratedException.java
index 45329c8101b..ed74d972a6d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/StateStoreMigratedException.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/StateStoreMigratedException.java
@@ -23,6 +23,7 @@ package org.apache.kafka.streams.errors;
  * This could happen because the store moved to some other instance during a 
rebalance so
  * rediscovery of the state store is required before retrying.
  */
+@SuppressWarnings("unused")
 public class StateStoreMigratedException extends InvalidStateStoreException {
 
     private static final long serialVersionUID = 1L;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/StateStoreNotAvailableException.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/StateStoreNotAvailableException.java
index 7cec17c40d6..26660b2ea7e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/StateStoreNotAvailableException.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/StateStoreNotAvailableException.java
@@ -22,6 +22,7 @@ package org.apache.kafka.streams.errors;
  * {@link org.apache.kafka.streams.KafkaStreams.State#NOT_RUNNING NOT_RUNNING} 
or
  * {@link org.apache.kafka.streams.KafkaStreams.State#ERROR ERROR} state.
  */
+@SuppressWarnings("unused")
 public class StateStoreNotAvailableException extends 
InvalidStateStoreException {
 
     private static final long serialVersionUID = 1L;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/StreamsException.java 
b/streams/src/main/java/org/apache/kafka/streams/errors/StreamsException.java
index c3c03f53001..07cdeb4bc3b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/StreamsException.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/StreamsException.java
@@ -61,9 +61,9 @@ public class StreamsException extends KafkaException {
     }
 
     /**
-     * @return  the {@link TaskId} that this exception originated from, or 
{@link Optional#empty()} if the exception
-     *          cannot be traced back to a particular task. Note that the 
{@code TaskId} being empty does not
-     *          guarantee that the exception wasn't directly related to a 
specific task.
+     * @return The {@link TaskId} that this exception originated from, or 
{@link Optional#empty()} if the exception
+     *         cannot be traced back to a particular task. Note that the 
{@code TaskId} being empty does not
+     *         guarantee that the exception wasn't directly related to a 
specific task.
      */
     public Optional<TaskId> taskId() {
         return Optional.ofNullable(taskId);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/StreamsNotStartedException.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/StreamsNotStartedException.java
index 562be0ebde8..1179c07f422 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/StreamsNotStartedException.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/StreamsNotStartedException.java
@@ -32,6 +32,7 @@ public class StreamsNotStartedException extends 
InvalidStateStoreException {
         super(message);
     }
 
+    @SuppressWarnings("unused")
     public StreamsNotStartedException(final String message, final Throwable 
throwable) {
         super(message, throwable);
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/StreamsRebalancingException.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/StreamsRebalancingException.java
index 4b8e14c9b65..7c6c027d266 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/StreamsRebalancingException.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/StreamsRebalancingException.java
@@ -21,6 +21,7 @@ package org.apache.kafka.streams.errors;
  * cannot be queried by default. You can retry to query after the rebalance 
finished. As an alternative, you can also query
  * (potentially stale) state stores during a rebalance via {@link 
org.apache.kafka.streams.StoreQueryParameters#enableStaleStores()}.
  */
+@SuppressWarnings("unused")
 public class StreamsRebalancingException extends InvalidStateStoreException {
 
     private static final long serialVersionUID = 1L;
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/StreamsStoppedException.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/StreamsStoppedException.java
index c05708bc921..252edbe3143 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/StreamsStoppedException.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/StreamsStoppedException.java
@@ -32,6 +32,7 @@ public class StreamsStoppedException extends 
InvalidStateStoreException {
         super(message);
     }
 
+    @SuppressWarnings("unused")
     public StreamsStoppedException(final String message, final Throwable 
throwable) {
         super(message, throwable);
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.java
index 5502b35bc21..2e76d5a0788 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.java
@@ -19,7 +19,11 @@ package org.apache.kafka.streams.errors;
 public interface StreamsUncaughtExceptionHandler {
     /**
      * Inspect the exception received in a stream thread and respond with an 
action.
-     * @param exception the actual exception
+     *
+     * @param exception
+     *     The actual exception.
+     *
+     * @return Whether to replace the failed thread, or to shut down the 
client or the whole application.
      */
     StreamThreadExceptionResponse handle(final Throwable exception);
 
@@ -27,14 +31,21 @@ public interface StreamsUncaughtExceptionHandler {
      * Enumeration that describes the response from the exception handler.
      */
     enum StreamThreadExceptionResponse {
+        /** Replace the failed thread with a new one. */
         REPLACE_THREAD(0, "REPLACE_THREAD"),
+        /** Shut down the client. */
         SHUTDOWN_CLIENT(1, "SHUTDOWN_KAFKA_STREAMS_CLIENT"),
+        /** Try to shut down the whole application. */
         SHUTDOWN_APPLICATION(2, "SHUTDOWN_KAFKA_STREAMS_APPLICATION");
 
-        /** an english description of the api--this is for debugging and can 
change */
+        /**
+         * An english description for the used option. This is for debugging 
only and may change.
+         */
         public final String name;
 
-        /** the permanent and immutable id of an API--this can't change ever */
+        /**
+         * The permanent and immutable id for the used option. This can't 
change ever.
+         */
         public final int id;
 
         StreamThreadExceptionResponse(final int id, final String name) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java
index 0f6c50579d3..8fda0fb19b1 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java
@@ -24,10 +24,10 @@ import java.util.Set;
 /**
  * Indicates a specific task is corrupted and need to be re-initialized. It 
can be thrown when:
  *
- * <ol>
+ * <ul>
  *   <li>Under EOS, if the checkpoint file does not contain offsets for 
corresponding store's changelogs, meaning previously it was not close 
cleanly.</li>
  *   <li>Out-of-range exception thrown during restoration, meaning that the 
changelog has been modified and we re-bootstrap the store.</li>
- * </ol>
+ * </ul>
  */
 public class TaskCorruptedException extends StreamsException {
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/TaskIdFormatException.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/TaskIdFormatException.java
index 63493436816..04e60ffb6c4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/TaskIdFormatException.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/TaskIdFormatException.java
@@ -35,6 +35,7 @@ public class TaskIdFormatException extends StreamsException {
         super("Task id cannot be parsed correctly" + (message == null ? "" : " 
from " + message), throwable);
     }
 
+    @SuppressWarnings("unused")
     public TaskIdFormatException(final Throwable throwable) {
         super(throwable);
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/TopologyException.java 
b/streams/src/main/java/org/apache/kafka/streams/errors/TopologyException.java
index 1eaef0691b8..30ed93f2aa0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/TopologyException.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/TopologyException.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams.errors;
 
-
 /**
  * Indicates a pre run time error occurred while parsing the {@link 
org.apache.kafka.streams.Topology logical topology}
  * to construct the {@link 
org.apache.kafka.streams.processor.internals.ProcessorTopology physical 
processor topology}.
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/UnknownStateStoreException.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/UnknownStateStoreException.java
index 0ee0658bec4..8fffe89be2e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/UnknownStateStoreException.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/UnknownStateStoreException.java
@@ -28,6 +28,7 @@ public class UnknownStateStoreException extends 
InvalidStateStoreException {
         super(message);
     }
 
+    @SuppressWarnings("unused")
     public UnknownStateStoreException(final String message, final Throwable 
throwable) {
         super(message, throwable);
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/errors/UnknownTopologyException.java
 
b/streams/src/main/java/org/apache/kafka/streams/errors/UnknownTopologyException.java
index d7644841a23..accac453a3d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/errors/UnknownTopologyException.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/errors/UnknownTopologyException.java
@@ -27,6 +27,7 @@ public class UnknownTopologyException extends 
StreamsException {
         super(message + " due to being unable to locate a Topology named " + 
namedTopology);
     }
 
+    @SuppressWarnings("unused")
     public UnknownTopologyException(final String message, final Throwable 
throwable, final String namedTopology) {
         super(message + " due to being unable to locate a Topology named " + 
namedTopology, throwable);
     }


Reply via email to