This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 240efbb99d9 MINOR: improve JavaDocs for Kafka Streams exceptions and
error handlers (#17856)
240efbb99d9 is described below
commit 240efbb99d9ee489251db0a8110936ad9eb0eb48
Author: Matthias J. Sax <[email protected]>
AuthorDate: Thu Nov 21 11:46:23 2024 -0800
MINOR: improve JavaDocs for Kafka Streams exceptions and error handlers
(#17856)
Reviewers: Bill Bejeck <[email protected]>
---
.../streams/errors/BrokerNotFoundException.java | 2 +-
.../errors/DefaultProductionExceptionHandler.java | 1 +
.../errors/DeserializationExceptionHandler.java | 39 +++++++++++------
.../kafka/streams/errors/ErrorHandlerContext.java | 19 ++++-----
.../streams/errors/InvalidStateStoreException.java | 3 +-
.../InvalidStateStorePartitionException.java | 4 +-
.../apache/kafka/streams/errors/LockException.java | 1 -
.../errors/LogAndContinueExceptionHandler.java | 26 ++++++++----
.../LogAndContinueProcessingExceptionHandler.java | 13 ++++--
.../streams/errors/LogAndFailExceptionHandler.java | 28 ++++++++-----
.../LogAndFailProcessingExceptionHandler.java | 13 ++++--
.../streams/errors/ProcessingExceptionHandler.java | 19 +++++----
.../streams/errors/ProcessorStateException.java | 1 -
.../streams/errors/ProductionExceptionHandler.java | 49 ++++++++++++++++------
.../errors/StateStoreMigratedException.java | 1 +
.../errors/StateStoreNotAvailableException.java | 1 +
.../kafka/streams/errors/StreamsException.java | 6 +--
.../streams/errors/StreamsNotStartedException.java | 1 +
.../errors/StreamsRebalancingException.java | 1 +
.../streams/errors/StreamsStoppedException.java | 1 +
.../errors/StreamsUncaughtExceptionHandler.java | 17 ++++++--
.../streams/errors/TaskCorruptedException.java | 4 +-
.../streams/errors/TaskIdFormatException.java | 1 +
.../kafka/streams/errors/TopologyException.java | 1 -
.../streams/errors/UnknownStateStoreException.java | 1 +
.../streams/errors/UnknownTopologyException.java | 1 +
26 files changed, 170 insertions(+), 84 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/errors/BrokerNotFoundException.java
b/streams/src/main/java/org/apache/kafka/streams/errors/BrokerNotFoundException.java
index d2df27f8d34..bab44a65357 100644
---
a/streams/src/main/java/org/apache/kafka/streams/errors/BrokerNotFoundException.java
+++
b/streams/src/main/java/org/apache/kafka/streams/errors/BrokerNotFoundException.java
@@ -16,13 +16,13 @@
*/
package org.apache.kafka.streams.errors;
-
/**
* Indicates that none of the specified {@link
org.apache.kafka.streams.StreamsConfig#BOOTSTRAP_SERVERS_CONFIG brokers}
* could be found.
*
* @see org.apache.kafka.streams.StreamsConfig
*/
+@SuppressWarnings("unused")
public class BrokerNotFoundException extends StreamsException {
private static final long serialVersionUID = 1L;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java
b/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java
index d6cc8e915e7..8b1562861c4 100644
---
a/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java
+++
b/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java
@@ -26,6 +26,7 @@ import java.util.Map;
* happens while attempting to produce result records.
*/
public class DefaultProductionExceptionHandler implements
ProductionExceptionHandler {
+ @SuppressWarnings("deprecation")
@Deprecated
@Override
public ProductionExceptionHandlerResponse handle(final
ProducerRecord<byte[], byte[]> record,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java
b/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java
index 198a97cce44..0b44e04d791 100644
---
a/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java
+++
b/streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java
@@ -29,14 +29,20 @@ public interface DeserializationExceptionHandler extends
Configurable {
/**
* Inspect a record and the exception received.
- * <p>
- * Note, that the passed in {@link ProcessorContext} only allows to access
metadata like the task ID.
+ *
+ * <p> Note, that the passed in {@link ProcessorContext} only allows to
access metadata like the task ID.
* However, it cannot be used to emit records via {@link
ProcessorContext#forward(Object, Object)};
* calling {@code forward()} (and some other methods) would result in a
runtime exception.
*
- * @param context processor context
- * @param record record that failed deserialization
- * @param exception the actual exception
+ * @param context
+ * Processor context.
+ * @param record
+ * Record that failed deserialization.
+ * @param exception
+ * The actual exception.
+ *
+ * @return Whether to continue or stop processing.
+ *
* @deprecated Since 3.9. Use {@link #handle(ErrorHandlerContext,
ConsumerRecord, Exception)} instead.
*/
@Deprecated
@@ -49,9 +55,14 @@ public interface DeserializationExceptionHandler extends
Configurable {
/**
* Inspect a record and the exception received.
*
- * @param context error handler context
- * @param record record that failed deserialization
- * @param exception the actual exception
+ * @param context
+ * Error handler context.
+ * @param record
+ * Record that failed deserialization.
+ * @param exception
+ * The actual exception.
+ *
+ * @return Whether to continue or stop processing.
*/
default DeserializationHandlerResponse handle(final ErrorHandlerContext
context,
final ConsumerRecord<byte[],
byte[]> record,
@@ -63,15 +74,19 @@ public interface DeserializationExceptionHandler extends
Configurable {
* Enumeration that describes the response from the exception handler.
*/
enum DeserializationHandlerResponse {
- /* continue with processing */
+ /** Continue processing. */
CONTINUE(0, "CONTINUE"),
- /* fail the processing and stop */
+ /** Fail processing. */
FAIL(1, "FAIL");
- /** an english description of the api--this is for debugging and can
change */
+ /**
+ * An english description for the used option. This is for debugging
only and may change.
+ */
public final String name;
- /** the permanent and immutable id of an API--this can't change ever */
+ /**
+ * The permanent and immutable id for the used option. This can't
change ever.
+ */
public final int id;
DeserializationHandlerResponse(final int id, final String name) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java
b/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java
index 82d32581255..d471673a48e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java
+++
b/streams/src/main/java/org/apache/kafka/streams/errors/ErrorHandlerContext.java
@@ -26,7 +26,6 @@ import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
-
/**
* This interface allows user code to inspect the context of a record that has
failed during processing.
*
@@ -48,7 +47,7 @@ public interface ErrorHandlerContext {
* Additionally, when writing into a changelog topic, there is no
associated input record,
* and thus no topic name is available.
*
- * @return the topic name
+ * @return The topic name.
*/
String topic();
@@ -66,7 +65,7 @@ public interface ErrorHandlerContext {
* Additionally, when writing into a changelog topic, there is no
associated input record,
* and thus no partition is available.
*
- * @return the partition ID
+ * @return The partition ID.
*/
int partition();
@@ -84,7 +83,7 @@ public interface ErrorHandlerContext {
* Additionally, when writing into a changelog topic, there is no
associated input record,
* and thus no offset is available.
*
- * @return the offset
+ * @return The offset.
*/
long offset();
@@ -102,21 +101,21 @@ public interface ErrorHandlerContext {
* Additionally, when writing into a changelog topic, there is no
associated input record,
* and thus no headers are available.
*
- * @return the headers
+ * @return The headers.
*/
Headers headers();
/**
* Return the current processor node ID.
*
- * @return the processor node ID
+ * @return The processor node ID.
*/
String processorNodeId();
/**
* Return the task ID.
*
- * @return the task ID
+ * @return The task ID.
*/
TaskId taskId();
@@ -138,14 +137,14 @@ public interface ErrorHandlerContext {
* if this method is invoked from the punctuate call):
* <ul>
* <li>In case of {@link PunctuationType#STREAM_TIME} timestamp is
defined as the current task's stream time,
- * which is defined as the largest timestamp of any record processed by
the task
- * <li>In case of {@link PunctuationType#WALL_CLOCK_TIME} timestamp is
defined the current system time
+ * which is defined as the largest timestamp of any record processed by
the task</li>
+ * <li>In case of {@link PunctuationType#WALL_CLOCK_TIME} timestamp is
defined the current system time</li>
* </ul>
*
* <p> If it is triggered from a deserialization failure, timestamp is
defined as the timestamp of the
* current rawRecord {@link
org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord}.
*
- * @return the timestamp
+ * @return The timestamp.
*/
long timestamp();
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/errors/InvalidStateStoreException.java
b/streams/src/main/java/org/apache/kafka/streams/errors/InvalidStateStoreException.java
index 6ad06a54096..f5fbef72547 100644
---
a/streams/src/main/java/org/apache/kafka/streams/errors/InvalidStateStoreException.java
+++
b/streams/src/main/java/org/apache/kafka/streams/errors/InvalidStateStoreException.java
@@ -16,10 +16,9 @@
*/
package org.apache.kafka.streams.errors;
-
/**
* Indicates that there was a problem when trying to access a {@link
org.apache.kafka.streams.processor.StateStore StateStore}.
- * {@code InvalidStateStoreException} is not thrown directly but only its
following sub-classes.
+ * {@code InvalidStateStoreException} is not thrown directly but only its
following subclasses.
*/
public class InvalidStateStoreException extends StreamsException {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/errors/InvalidStateStorePartitionException.java
b/streams/src/main/java/org/apache/kafka/streams/errors/InvalidStateStorePartitionException.java
index e85a0375c42..f27926bfc65 100644
---
a/streams/src/main/java/org/apache/kafka/streams/errors/InvalidStateStorePartitionException.java
+++
b/streams/src/main/java/org/apache/kafka/streams/errors/InvalidStateStorePartitionException.java
@@ -21,7 +21,8 @@ import org.apache.kafka.streams.KafkaStreams;
/**
* Indicates that the specific state store being queried via
* {@link org.apache.kafka.streams.StoreQueryParameters} used a partitioning
that is not assigned to this instance.
- * You can use {@link KafkaStreams#metadataForAllStreamsClients()} to discover
the correct instance that hosts the requested partition.
+ * You can use {@link KafkaStreams#metadataForAllStreamsClients()} to discover
the correct instance
+ * that hosts the requested partition.
*/
public class InvalidStateStorePartitionException extends
InvalidStateStoreException {
@@ -31,6 +32,7 @@ public class InvalidStateStorePartitionException extends
InvalidStateStoreExcept
super(message);
}
+ @SuppressWarnings("unused")
public InvalidStateStorePartitionException(final String message, final
Throwable throwable) {
super(message, throwable);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/errors/LockException.java
b/streams/src/main/java/org/apache/kafka/streams/errors/LockException.java
index 80bb592b4d4..a16171f5ee4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/errors/LockException.java
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/LockException.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.errors;
-
/**
* Indicates that the state store directory lock could not be acquired because
another thread holds the lock.
*
diff --git
a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java
b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java
index a93b7c99517..3622da137c1 100644
---
a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java
+++
b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.errors;
-
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.ProcessorContext;
@@ -32,16 +31,21 @@ import java.util.Map;
public class LogAndContinueExceptionHandler implements
DeserializationExceptionHandler {
private static final Logger log =
LoggerFactory.getLogger(LogAndContinueExceptionHandler.class);
+ @SuppressWarnings("deprecation")
@Deprecated
@Override
public DeserializationHandlerResponse handle(final ProcessorContext
context,
final ConsumerRecord<byte[],
byte[]> record,
final Exception exception) {
- log.warn("Exception caught during Deserialization, " +
- "taskId: {}, topic: {}, partition: {}, offset: {}",
- context.taskId(), record.topic(), record.partition(),
record.offset(),
- exception);
+ log.warn(
+ "Exception caught during Deserialization, taskId: {}, topic: {},
partition: {}, offset: {}",
+ context.taskId(),
+ record.topic(),
+ record.partition(),
+ record.offset(),
+ exception
+ );
return DeserializationHandlerResponse.CONTINUE;
}
@@ -51,10 +55,14 @@ public class LogAndContinueExceptionHandler implements
DeserializationExceptionH
final ConsumerRecord<byte[],
byte[]> record,
final Exception exception) {
- log.warn("Exception caught during Deserialization, " +
- "taskId: {}, topic: {}, partition: {}, offset: {}",
- context.taskId(), record.topic(), record.partition(),
record.offset(),
- exception);
+ log.warn(
+ "Exception caught during Deserialization, taskId: {}, topic: {},
partition: {}, offset: {}",
+ context.taskId(),
+ record.topic(),
+ record.partition(),
+ record.offset(),
+ exception
+ );
return DeserializationHandlerResponse.CONTINUE;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueProcessingExceptionHandler.java
b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueProcessingExceptionHandler.java
index 113510d5889..c832ab14200 100644
---
a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueProcessingExceptionHandler.java
+++
b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueProcessingExceptionHandler.java
@@ -32,10 +32,15 @@ public class LogAndContinueProcessingExceptionHandler
implements ProcessingExcep
@Override
public ProcessingHandlerResponse handle(final ErrorHandlerContext context,
final Record<?, ?> record, final Exception exception) {
- log.warn("Exception caught during message processing, " +
- "processor node: {}, taskId: {}, source topic: {}, source
partition: {}, source offset: {}",
- context.processorNodeId(), context.taskId(), context.topic(),
context.partition(), context.offset(),
- exception);
+ log.warn(
+ "Exception caught during message processing, processor node: {},
taskId: {}, source topic: {}, source partition: {}, source offset: {}",
+ context.processorNodeId(),
+ context.taskId(),
+ context.topic(),
+ context.partition(),
+ context.offset(),
+ exception
+ );
return ProcessingHandlerResponse.CONTINUE;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java
b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java
index 5fdda623bdd..aaef5ca050b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java
+++
b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java
@@ -24,7 +24,6 @@ import org.slf4j.LoggerFactory;
import java.util.Map;
-
/**
* Deserialization handler that logs a deserialization exception and then
* signals the processing pipeline to stop processing more records and fail.
@@ -32,16 +31,21 @@ import java.util.Map;
public class LogAndFailExceptionHandler implements
DeserializationExceptionHandler {
private static final Logger log =
LoggerFactory.getLogger(LogAndFailExceptionHandler.class);
- @Override
+ @SuppressWarnings("deprecation")
@Deprecated
+ @Override
public DeserializationHandlerResponse handle(final ProcessorContext
context,
final ConsumerRecord<byte[],
byte[]> record,
final Exception exception) {
- log.error("Exception caught during Deserialization, " +
- "taskId: {}, topic: {}, partition: {}, offset: {}",
- context.taskId(), record.topic(), record.partition(),
record.offset(),
- exception);
+ log.error(
+ "Exception caught during Deserialization, taskId: {}, topic: {},
partition: {}, offset: {}",
+ context.taskId(),
+ record.topic(),
+ record.partition(),
+ record.offset(),
+ exception
+ );
return DeserializationHandlerResponse.FAIL;
}
@@ -51,10 +55,14 @@ public class LogAndFailExceptionHandler implements
DeserializationExceptionHandl
final ConsumerRecord<byte[],
byte[]> record,
final Exception exception) {
- log.error("Exception caught during Deserialization, " +
- "taskId: {}, topic: {}, partition: {}, offset: {}",
- context.taskId(), record.topic(), record.partition(),
record.offset(),
- exception);
+ log.error(
+ "Exception caught during Deserialization, taskId: {}, topic: {},
partition: {}, offset: {}",
+ context.taskId(),
+ record.topic(),
+ record.partition(),
+ record.offset(),
+ exception
+ );
return DeserializationHandlerResponse.FAIL;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProcessingExceptionHandler.java
b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProcessingExceptionHandler.java
index 9c2cf91c605..f592663a6c0 100644
---
a/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProcessingExceptionHandler.java
+++
b/streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProcessingExceptionHandler.java
@@ -32,10 +32,15 @@ public class LogAndFailProcessingExceptionHandler
implements ProcessingException
@Override
public ProcessingHandlerResponse handle(final ErrorHandlerContext context,
final Record<?, ?> record, final Exception exception) {
- log.error("Exception caught during message processing, " +
- "processor node: {}, taskId: {}, source topic: {}, source
partition: {}, source offset: {}",
- context.processorNodeId(), context.taskId(), context.topic(),
context.partition(), context.offset(),
- exception);
+ log.error(
+ "Exception caught during message processing, processor node: {},
taskId: {}, source topic: {}, source partition: {}, source offset: {}",
+ context.processorNodeId(),
+ context.taskId(),
+ context.topic(),
+ context.partition(),
+ context.offset(),
+ exception
+ );
return ProcessingHandlerResponse.FAIL;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/errors/ProcessingExceptionHandler.java
b/streams/src/main/java/org/apache/kafka/streams/errors/ProcessingExceptionHandler.java
index 33b2596be12..7dc1b90bc2e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/errors/ProcessingExceptionHandler.java
+++
b/streams/src/main/java/org/apache/kafka/streams/errors/ProcessingExceptionHandler.java
@@ -26,25 +26,30 @@ public interface ProcessingExceptionHandler extends
Configurable {
/**
* Inspect a record and the exception received
*
- * @param context processing context metadata
- * @param record record where the exception occurred
- * @param exception the actual exception
+ * @param context
+ * Processing context metadata.
+ * @param record
+ * Record where the exception occurred.
+ * @param exception
+ * The actual exception.
+ *
+ * @return Whether to continue or stop processing.
*/
ProcessingHandlerResponse handle(final ErrorHandlerContext context, final
Record<?, ?> record, final Exception exception);
enum ProcessingHandlerResponse {
- /* continue with processing */
+ /** Continue processing. */
CONTINUE(1, "CONTINUE"),
- /* fail the processing and stop */
+ /** Fail processing. */
FAIL(2, "FAIL");
/**
- * the permanent and immutable name of processing exception response
+ * An english description for the used option. This is for debugging
only and may change.
*/
public final String name;
/**
- * the permanent and immutable id of processing exception response
+ * The permanent and immutable id for the used option. This can't
change ever.
*/
public final int id;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/errors/ProcessorStateException.java
b/streams/src/main/java/org/apache/kafka/streams/errors/ProcessorStateException.java
index 44636043786..1e40b79fd86 100644
---
a/streams/src/main/java/org/apache/kafka/streams/errors/ProcessorStateException.java
+++
b/streams/src/main/java/org/apache/kafka/streams/errors/ProcessorStateException.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.errors;
-
/**
* Indicates a processor state operation (e.g. put, get) has failed.
*
diff --git
a/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java
b/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java
index 95127887b36..ed6b38a5692 100644
---
a/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java
+++
b/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java
@@ -26,10 +26,15 @@ import org.apache.kafka.common.Configurable;
public interface ProductionExceptionHandler extends Configurable {
/**
* Inspect a record that we attempted to produce, and the exception that
resulted
- * from attempting to produce it and determine whether or not to continue
processing.
+ * from attempting to produce it and determine to continue or stop
processing.
+ *
+ * @param record
+ * The record that failed to produce.
+ * @param exception
+ * The exception that occurred during production.
+ *
+ * @return Whether to continue or stop processing, or retry the failed
operation.
*
- * @param record The record that failed to produce
- * @param exception The exception that occurred during production
* @deprecated Since 3.9. Use {@link #handle(ErrorHandlerContext,
ProducerRecord, Exception)} instead.
*/
@Deprecated
@@ -40,11 +45,16 @@ public interface ProductionExceptionHandler extends
Configurable {
/**
* Inspect a record that we attempted to produce, and the exception that
resulted
- * from attempting to produce it and determine whether or not to continue
processing.
+ * from attempting to produce it and determine to continue or stop
processing.
+ *
+ * @param context
+ * The error handler context metadata.
+ * @param record
+ * The record that failed to produce.
+ * @param exception
+ * The exception that occurred during production.
*
- * @param context The error handler context metadata
- * @param record The record that failed to produce
- * @param exception The exception that occurred during production
+ * @return Whether to continue or stop processing, or retry the failed
operation.
*/
default ProductionExceptionHandlerResponse handle(final
ErrorHandlerContext context,
final
ProducerRecord<byte[], byte[]> record,
@@ -56,10 +66,16 @@ public interface ProductionExceptionHandler extends
Configurable {
* Handles serialization exception and determine if the process should
continue. The default implementation is to
* fail the process.
*
- * @param record the record that failed to serialize
- * @param exception the exception that occurred during serialization
+ * @param record
+ * The record that failed to serialize.
+ * @param exception
+ * The exception that occurred during serialization.
+ *
+ * @return Whether to continue or stop processing, or retry the failed
operation.
+ *
* @deprecated Since 3.9. Use {@link
#handleSerializationException(ErrorHandlerContext, ProducerRecord, Exception,
SerializationExceptionOrigin)} instead.
*/
+ @SuppressWarnings({"rawtypes", "unused"})
@Deprecated
default ProductionExceptionHandlerResponse
handleSerializationException(final ProducerRecord record,
final Exception exception) {
@@ -70,11 +86,18 @@ public interface ProductionExceptionHandler extends
Configurable {
* Handles serialization exception and determine if the process should
continue. The default implementation is to
* fail the process.
*
- * @param context the error handler context metadata
- * @param record the record that failed to serialize
- * @param exception the exception that occurred during serialization
- * @param origin the origin of the serialization exception
+ * @param context
+ * The error handler context metadata.
+ * @param record
+ * The record that failed to serialize.
+ * @param exception
+ * The exception that occurred during serialization.
+ * @param origin
+ * The origin of the serialization exception.
+ *
+ * @return Whether to continue or stop processing, or retry the failed
operation.
*/
+ @SuppressWarnings("rawtypes")
default ProductionExceptionHandlerResponse
handleSerializationException(final ErrorHandlerContext context,
final ProducerRecord record,
final Exception exception,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/errors/StateStoreMigratedException.java
b/streams/src/main/java/org/apache/kafka/streams/errors/StateStoreMigratedException.java
index 45329c8101b..ed74d972a6d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/errors/StateStoreMigratedException.java
+++
b/streams/src/main/java/org/apache/kafka/streams/errors/StateStoreMigratedException.java
@@ -23,6 +23,7 @@ package org.apache.kafka.streams.errors;
* This could happen because the store moved to some other instance during a
rebalance so
* rediscovery of the state store is required before retrying.
*/
+@SuppressWarnings("unused")
public class StateStoreMigratedException extends InvalidStateStoreException {
private static final long serialVersionUID = 1L;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/errors/StateStoreNotAvailableException.java
b/streams/src/main/java/org/apache/kafka/streams/errors/StateStoreNotAvailableException.java
index 7cec17c40d6..26660b2ea7e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/errors/StateStoreNotAvailableException.java
+++
b/streams/src/main/java/org/apache/kafka/streams/errors/StateStoreNotAvailableException.java
@@ -22,6 +22,7 @@ package org.apache.kafka.streams.errors;
* {@link org.apache.kafka.streams.KafkaStreams.State#NOT_RUNNING NOT_RUNNING}
or
* {@link org.apache.kafka.streams.KafkaStreams.State#ERROR ERROR} state.
*/
+@SuppressWarnings("unused")
public class StateStoreNotAvailableException extends
InvalidStateStoreException {
private static final long serialVersionUID = 1L;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/errors/StreamsException.java
b/streams/src/main/java/org/apache/kafka/streams/errors/StreamsException.java
index c3c03f53001..07cdeb4bc3b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/errors/StreamsException.java
+++
b/streams/src/main/java/org/apache/kafka/streams/errors/StreamsException.java
@@ -61,9 +61,9 @@ public class StreamsException extends KafkaException {
}
/**
- * @return the {@link TaskId} that this exception originated from, or
{@link Optional#empty()} if the exception
- * cannot be traced back to a particular task. Note that the
{@code TaskId} being empty does not
- * guarantee that the exception wasn't directly related to a
specific task.
+ * @return The {@link TaskId} that this exception originated from, or
{@link Optional#empty()} if the exception
+ * cannot be traced back to a particular task. Note that the
{@code TaskId} being empty does not
+ * guarantee that the exception wasn't directly related to a
specific task.
*/
public Optional<TaskId> taskId() {
return Optional.ofNullable(taskId);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/errors/StreamsNotStartedException.java
b/streams/src/main/java/org/apache/kafka/streams/errors/StreamsNotStartedException.java
index 562be0ebde8..1179c07f422 100644
---
a/streams/src/main/java/org/apache/kafka/streams/errors/StreamsNotStartedException.java
+++
b/streams/src/main/java/org/apache/kafka/streams/errors/StreamsNotStartedException.java
@@ -32,6 +32,7 @@ public class StreamsNotStartedException extends
InvalidStateStoreException {
super(message);
}
+ @SuppressWarnings("unused")
public StreamsNotStartedException(final String message, final Throwable
throwable) {
super(message, throwable);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/errors/StreamsRebalancingException.java
b/streams/src/main/java/org/apache/kafka/streams/errors/StreamsRebalancingException.java
index 4b8e14c9b65..7c6c027d266 100644
---
a/streams/src/main/java/org/apache/kafka/streams/errors/StreamsRebalancingException.java
+++
b/streams/src/main/java/org/apache/kafka/streams/errors/StreamsRebalancingException.java
@@ -21,6 +21,7 @@ package org.apache.kafka.streams.errors;
* cannot be queried by default. You can retry to query after the rebalance
finished. As an alternative, you can also query
* (potentially stale) state stores during a rebalance via {@link
org.apache.kafka.streams.StoreQueryParameters#enableStaleStores()}.
*/
+@SuppressWarnings("unused")
public class StreamsRebalancingException extends InvalidStateStoreException {
private static final long serialVersionUID = 1L;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/errors/StreamsStoppedException.java
b/streams/src/main/java/org/apache/kafka/streams/errors/StreamsStoppedException.java
index c05708bc921..252edbe3143 100644
---
a/streams/src/main/java/org/apache/kafka/streams/errors/StreamsStoppedException.java
+++
b/streams/src/main/java/org/apache/kafka/streams/errors/StreamsStoppedException.java
@@ -32,6 +32,7 @@ public class StreamsStoppedException extends
InvalidStateStoreException {
super(message);
}
+ @SuppressWarnings("unused")
public StreamsStoppedException(final String message, final Throwable
throwable) {
super(message, throwable);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.java
b/streams/src/main/java/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.java
index 5502b35bc21..2e76d5a0788 100644
---
a/streams/src/main/java/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.java
+++
b/streams/src/main/java/org/apache/kafka/streams/errors/StreamsUncaughtExceptionHandler.java
@@ -19,7 +19,11 @@ package org.apache.kafka.streams.errors;
public interface StreamsUncaughtExceptionHandler {
/**
* Inspect the exception received in a stream thread and respond with an
action.
- * @param exception the actual exception
+ *
+ * @param exception
+ * The actual exception.
+ *
+ * @return Whether to replace the failed thread, or to shut down the
client or the whole application.
*/
StreamThreadExceptionResponse handle(final Throwable exception);
@@ -27,14 +31,21 @@ public interface StreamsUncaughtExceptionHandler {
* Enumeration that describes the response from the exception handler.
*/
enum StreamThreadExceptionResponse {
+ /** Replace the failed thread with a new one. */
REPLACE_THREAD(0, "REPLACE_THREAD"),
+ /** Shut down the client. */
SHUTDOWN_CLIENT(1, "SHUTDOWN_KAFKA_STREAMS_CLIENT"),
+ /** Try to shut down the whole application. */
SHUTDOWN_APPLICATION(2, "SHUTDOWN_KAFKA_STREAMS_APPLICATION");
- /** an english description of the api--this is for debugging and can
change */
+ /**
+ * An english description for the used option. This is for debugging
only and may change.
+ */
public final String name;
- /** the permanent and immutable id of an API--this can't change ever */
+ /**
+ * The permanent and immutable id for the used option. This can't
change ever.
+ */
public final int id;
StreamThreadExceptionResponse(final int id, final String name) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java
b/streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java
index 0f6c50579d3..8fda0fb19b1 100644
---
a/streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java
+++
b/streams/src/main/java/org/apache/kafka/streams/errors/TaskCorruptedException.java
@@ -24,10 +24,10 @@ import java.util.Set;
/**
* Indicates a specific task is corrupted and need to be re-initialized. It
can be thrown when:
*
- * <ol>
+ * <ul>
* <li>Under EOS, if the checkpoint file does not contain offsets for
corresponding store's changelogs, meaning previously it was not close
cleanly.</li>
* <li>Out-of-range exception thrown during restoration, meaning that the
changelog has been modified and we re-bootstrap the store.</li>
- * </ol>
+ * </ul>
*/
public class TaskCorruptedException extends StreamsException {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/errors/TaskIdFormatException.java
b/streams/src/main/java/org/apache/kafka/streams/errors/TaskIdFormatException.java
index 63493436816..04e60ffb6c4 100644
---
a/streams/src/main/java/org/apache/kafka/streams/errors/TaskIdFormatException.java
+++
b/streams/src/main/java/org/apache/kafka/streams/errors/TaskIdFormatException.java
@@ -35,6 +35,7 @@ public class TaskIdFormatException extends StreamsException {
super("Task id cannot be parsed correctly" + (message == null ? "" : "
from " + message), throwable);
}
+ @SuppressWarnings("unused")
public TaskIdFormatException(final Throwable throwable) {
super(throwable);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/errors/TopologyException.java
b/streams/src/main/java/org/apache/kafka/streams/errors/TopologyException.java
index 1eaef0691b8..30ed93f2aa0 100644
---
a/streams/src/main/java/org/apache/kafka/streams/errors/TopologyException.java
+++
b/streams/src/main/java/org/apache/kafka/streams/errors/TopologyException.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.errors;
-
/**
* Indicates a pre run time error occurred while parsing the {@link
org.apache.kafka.streams.Topology logical topology}
* to construct the {@link
org.apache.kafka.streams.processor.internals.ProcessorTopology physical
processor topology}.
diff --git
a/streams/src/main/java/org/apache/kafka/streams/errors/UnknownStateStoreException.java
b/streams/src/main/java/org/apache/kafka/streams/errors/UnknownStateStoreException.java
index 0ee0658bec4..8fffe89be2e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/errors/UnknownStateStoreException.java
+++
b/streams/src/main/java/org/apache/kafka/streams/errors/UnknownStateStoreException.java
@@ -28,6 +28,7 @@ public class UnknownStateStoreException extends
InvalidStateStoreException {
super(message);
}
+ @SuppressWarnings("unused")
public UnknownStateStoreException(final String message, final Throwable
throwable) {
super(message, throwable);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/errors/UnknownTopologyException.java
b/streams/src/main/java/org/apache/kafka/streams/errors/UnknownTopologyException.java
index d7644841a23..accac453a3d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/errors/UnknownTopologyException.java
+++
b/streams/src/main/java/org/apache/kafka/streams/errors/UnknownTopologyException.java
@@ -27,6 +27,7 @@ public class UnknownTopologyException extends
StreamsException {
super(message + " due to being unable to locate a Topology named " +
namedTopology);
}
+ @SuppressWarnings("unused")
public UnknownTopologyException(final String message, final Throwable
throwable, final String namedTopology) {
super(message + " due to being unable to locate a Topology named " +
namedTopology, throwable);
}