This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.9 by this push:
     new aaed1bdd891 KAFKA-16448: Unify class cast exception handling for both 
key and value (#16736)
aaed1bdd891 is described below

commit aaed1bdd8911be329123c7f675deb6875ebf55bd
Author: Loïc GREFFIER <[email protected]>
AuthorDate: Wed Jul 31 22:24:15 2024 +0200

    KAFKA-16448: Unify class cast exception handling for both key and value 
(#16736)
    
    Part of KIP-1033. Minor code cleanup.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../processor/internals/RecordCollectorImpl.java   | 52 +++++++++-------------
 1 file changed, 22 insertions(+), 30 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 42b8d4f082b..7a8b77b8a5b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -52,9 +52,11 @@ import 
org.apache.kafka.streams.processor.internals.metrics.TopicMetrics;
 
 import org.slf4j.Logger;
 
+import java.text.MessageFormat;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -199,7 +201,8 @@ public class RecordCollectorImpl implements RecordCollector 
{
         try {
             keyBytes = keySerializer.serialize(topic, headers, key);
         } catch (final ClassCastException exception) {
-            throw createStreamsExceptionForKeyClassCastException(
+            throw createStreamsExceptionForClassCastException(
+                ProductionExceptionHandler.SerializationExceptionOrigin.KEY,
                 topic,
                 key,
                 keySerializer,
@@ -222,7 +225,8 @@ public class RecordCollectorImpl implements RecordCollector 
{
         try {
             valBytes = valueSerializer.serialize(topic, headers, value);
         } catch (final ClassCastException exception) {
-            throw createStreamsExceptionForValueClassCastException(
+            throw createStreamsExceptionForClassCastException(
+                ProductionExceptionHandler.SerializationExceptionOrigin.VALUE,
                 topic,
                 value,
                 valueSerializer,
@@ -334,39 +338,27 @@ public class RecordCollectorImpl implements 
RecordCollector {
 
         droppedRecordsSensor.record();
     }
-    private <K> StreamsException 
createStreamsExceptionForKeyClassCastException(final String topic,
-                                                                               
 final K key,
-                                                                               
 final Serializer<K> keySerializer,
-                                                                               
 final ClassCastException exception) {
-        final String keyClass = key == null ? "unknown because key is null" : 
key.getClass().getName();
-        return new StreamsException(
-                String.format(
-                        "ClassCastException while producing data to topic %s. 
" +
-                                "The key serializer %s is not compatible to 
the actual key type: %s. " +
-                                "Change the default key serde in StreamConfig 
or provide the correct key serde via method parameters " +
-                                "(for example if using the DSL, `#to(String 
topic, Produced<K, V> produced)` with " +
-                                
"`Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).",
-                        topic,
-                        keySerializer.getClass().getName(),
-                        keyClass),
-                exception);
-    }
 
-    private <V> StreamsException 
createStreamsExceptionForValueClassCastException(final String topic,
-                                                                               
   final V value,
-                                                                               
   final Serializer<V> valueSerializer,
-                                                                               
   final ClassCastException exception) {
-        final String valueClass = value == null ? "unknown because value is 
null" : value.getClass().getName();
+    private <KV> StreamsException 
createStreamsExceptionForClassCastException(final 
ProductionExceptionHandler.SerializationExceptionOrigin origin,
+                                                                              
final String topic,
+                                                                              
final KV keyOrValue,
+                                                                              
final Serializer<KV> keyOrValueSerializer,
+                                                                              
final ClassCastException exception) {
+        final String keyOrValueClass = keyOrValue == null
+            ? String.format("unknown because %s is null", 
origin.toString().toLowerCase(Locale.ROOT)) : keyOrValue.getClass().getName();
+
         return new StreamsException(
+            MessageFormat.format(
                 String.format(
                         "ClassCastException while producing data to topic %s. 
" +
-                                "The value serializer %s is not compatible to 
the actual value type: %s. " +
-                                "Change the default value serde in 
StreamConfig or provide the correct value serde via method parameters " +
-                                "(for example if using the DSL, `#to(String 
topic, Produced<K, V> produced)` with " +
-                                
"`Produced.valueSerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).",
+                            "The {0} serializer %s is not compatible to the 
actual {0} type: %s. " +
+                            "Change the default {0} serde in StreamConfig or 
provide the correct {0} serde via method parameters " +
+                            "(for example if using the DSL, `#to(String topic, 
Produced<K, V> produced)` with " +
+                            
"`Produced.{0}Serde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`).",
                         topic,
-                        valueSerializer.getClass().getName(),
-                        valueClass),
+                        keyOrValueSerializer.getClass().getName(),
+                        keyOrValueClass),
+                origin.toString().toLowerCase(Locale.ROOT)),
                 exception);
     }
 

Reply via email to