This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 84b111f KAFKA-12963: Add processor name to error (#11262)
84b111f is described below
commit 84b111f968a47c65d3498a1b3af42dd644403728
Author: Andy Lapidas <[email protected]>
AuthorDate: Fri Aug 27 22:06:49 2021 -0500
KAFKA-12963: Add processor name to error (#11262)
This PR adds the processor name to the ClassCastException exception text in
process()
Reviewers: Anna Sophie Blee-Goldman <[email protected]>
---
.../org/apache/kafka/streams/processor/internals/ProcessorNode.java | 3 ++-
.../apache/kafka/streams/processor/internals/ProcessorNodeTest.java | 3 ++-
2 files changed, 4 insertions(+), 2 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index dfc0b70..48c95f1 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -147,7 +147,7 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
} catch (final ClassCastException e) {
final String keyClass = record.key() == null ? "unknown because
key is null" : record.key().getClass().getName();
final String valueClass = record.value() == null ? "unknown
because value is null" : record.value().getClass().getName();
- throw new StreamsException(String.format("ClassCastException
invoking Processor. Do the Processor's "
+ throw new StreamsException(String.format("ClassCastException
invoking processor: %s. Do the Processor's "
+ "input types match the deserialized types? Check the
Serde setup and change the default Serdes in "
+ "StreamConfig or provide correct Serdes via method
parameters. Make sure the Processor can accept "
+ "the deserialized input of type key: %s, and value:
%s.%n"
@@ -155,6 +155,7 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
+ "another cause (in user code, for example). For example,
if a processor wires in a store, but casts "
+ "the generics incorrectly, a class cast exception could
be raised during processing, but the "
+ "cause would not be wrong Serdes.",
+ this.name(),
keyClass,
valueClass),
e);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index 73147ed..87a4c68 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -193,7 +193,7 @@ public class ProcessorNodeTest {
final StreamsMetricsImpl streamsMetrics =
new StreamsMetricsImpl(metrics, "test-client",
StreamsConfig.METRICS_LATEST, new MockTime());
final InternalMockProcessorContext<Object, Object> context = new
InternalMockProcessorContext<>(streamsMetrics);
- final ProcessorNode<Object, Object, Object, Object> node = new
ProcessorNode<>("name", new ClassCastProcessor(), Collections.emptySet());
+ final ProcessorNode<Object, Object, Object, Object> node = new
ProcessorNode<>("pname", new ClassCastProcessor(), Collections.emptySet());
node.init(context);
final StreamsException se = assertThrows(
StreamsException.class,
@@ -202,5 +202,6 @@ public class ProcessorNodeTest {
assertThat(se.getCause(), instanceOf(ClassCastException.class));
assertThat(se.getMessage(), containsString("default Serdes"));
assertThat(se.getMessage(), containsString("input types"));
+ assertThat(se.getMessage(), containsString("pname"));
}
}