This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push:
new aaed1bdd891 KAFKA-16448: Unify class cast exception handling for both
key and value (#16736)
aaed1bdd891 is described below
commit aaed1bdd8911be329123c7f675deb6875ebf55bd
Author: Loïc GREFFIER <[email protected]>
AuthorDate: Wed Jul 31 22:24:15 2024 +0200
KAFKA-16448: Unify class cast exception handling for both key and value
(#16736)
Part of KIP-1033. Minor code cleanup.
Reviewers: Matthias J. Sax <[email protected]>
---
.../processor/internals/RecordCollectorImpl.java | 52 +++++++++-------------
1 file changed, 22 insertions(+), 30 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 42b8d4f082b..7a8b77b8a5b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -52,9 +52,11 @@ import
org.apache.kafka.streams.processor.internals.metrics.TopicMetrics;
import org.slf4j.Logger;
+import java.text.MessageFormat;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
@@ -199,7 +201,8 @@ public class RecordCollectorImpl implements RecordCollector
{
try {
keyBytes = keySerializer.serialize(topic, headers, key);
} catch (final ClassCastException exception) {
- throw createStreamsExceptionForKeyClassCastException(
+ throw createStreamsExceptionForClassCastException(
+ ProductionExceptionHandler.SerializationExceptionOrigin.KEY,
topic,
key,
keySerializer,
@@ -222,7 +225,8 @@ public class RecordCollectorImpl implements RecordCollector
{
try {
valBytes = valueSerializer.serialize(topic, headers, value);
} catch (final ClassCastException exception) {
- throw createStreamsExceptionForValueClassCastException(
+ throw createStreamsExceptionForClassCastException(
+ ProductionExceptionHandler.SerializationExceptionOrigin.VALUE,
topic,
value,
valueSerializer,
@@ -334,39 +338,27 @@ public class RecordCollectorImpl implements
RecordCollector {
droppedRecordsSensor.record();
}
- private <K> StreamsException
createStreamsExceptionForKeyClassCastException(final String topic,
-
final K key,
-
final Serializer<K> keySerializer,
-
final ClassCastException exception) {
- final String keyClass = key == null ? "unknown because key is null" :
key.getClass().getName();
- return new StreamsException(
- String.format(
- "ClassCastException while producing data to topic %s.
" +
- "The key serializer %s is not compatible to
the actual key type: %s. " +
- "Change the default key serde in StreamConfig
or provide the correct key serde via method parameters " +
- "(for example if using the DSL, `#to(String
topic, Produced<K, V> produced)` with " +
-
"`Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).",
- topic,
- keySerializer.getClass().getName(),
- keyClass),
- exception);
- }
- private <V> StreamsException
createStreamsExceptionForValueClassCastException(final String topic,
-
final V value,
-
final Serializer<V> valueSerializer,
-
final ClassCastException exception) {
- final String valueClass = value == null ? "unknown because value is
null" : value.getClass().getName();
+ private <KV> StreamsException
createStreamsExceptionForClassCastException(final
ProductionExceptionHandler.SerializationExceptionOrigin origin,
+
final String topic,
+
final KV keyOrValue,
+
final Serializer<KV> keyOrValueSerializer,
+
final ClassCastException exception) {
+ final String keyOrValueClass = keyOrValue == null
+ ? String.format("unknown because %s is null",
origin.toString().toLowerCase(Locale.ROOT)) : keyOrValue.getClass().getName();
+
return new StreamsException(
+ MessageFormat.format(
String.format(
"ClassCastException while producing data to topic %s.
" +
- "The value serializer %s is not compatible to
the actual value type: %s. " +
- "Change the default value serde in
StreamConfig or provide the correct value serde via method parameters " +
- "(for example if using the DSL, `#to(String
topic, Produced<K, V> produced)` with " +
-
"`Produced.valueSerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).",
+ "The {0} serializer %s is not compatible to the
actual {0} type: %s. " +
+ "Change the default {0} serde in StreamConfig or
provide the correct {0} serde via method parameters " +
+ "(for example if using the DSL, `#to(String topic,
Produced<K, V> produced)` with " +
+
"`Produced.{0}Serde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).",
topic,
- valueSerializer.getClass().getName(),
- valueClass),
+ keyOrValueSerializer.getClass().getName(),
+ keyOrValueClass),
+ origin.toString().toLowerCase(Locale.ROOT)),
exception);
}