[ 
https://issues.apache.org/jira/browse/KAFKA-4612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15813501#comment-15813501
 ] 

Matthias J. Sax commented on KAFKA-4612:
----------------------------------------

Did you configure global default key and value Serdes via {{StreamsConfig}} 
using parameters {{KEY_SERDE_CLASS_CONFIG}} and {{VALUE_SERDE_CLASS_CONFIG}}? I 
guess it's not a library issue in the strong sense but just a miss 
configuration. The problem is, that the data will be repartitioned after 
{{.selectKey}} by writing the data to a topic, but it does not find the correct 
Serde (we need to assume that the key type changes in {{.selectKey}} and thus 
fall back to global default Serdes). Right now, {{.selectKey}} does not allow 
to specify a new key Serde (what is kinda problem) -- a work around would be to 
put a {{.through}} after {{.selectKey}} as it allows to specify the required 
Serdes.

Btw: I think, in your example you can omit {{.selectKey}} as it does not set a 
new key anyway. (This change itself should actually fix the problem as 
repartitioning -- that is not necessarily required in your case -- is avoided 
-- and even if it is required the library know that the data type is String and 
can use the correct Serde automatically.)

Please verify. (Please do not close the JIRA even if my hints resolve the 
problem -- we might want to add an overload for {{.selectKey}} that allows to 
specify a new key Serde.)


> Simple Kafka Streams app causes "ClassCastException: java.lang.String cannot 
> be cast to [B"
> -------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-4612
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4612
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.1.1
>         Environment: Virtual Machine using Debian 8 + Confluent Platform 
> 3.1.1.
>            Reporter: Kurt Ostfeld
>         Attachments: KafkaIsolatedBug.tar.gz
>
>
> I've attached a minimal single source file project that reliably reproduces 
> this issue.
> This project does the following:
> 1) Create test input data. Produces a single random (String,String) record 
> into two diferent topics "topicInput" and "topicTable"
> 2) Creates and runs a Kafka Streams application:
>     val kafkaTable: KTable[String, String] = builder.table(Serdes.String, 
> Serdes.String, "topicTable", "topicTable")
>     val incomingRecords: KStream[String, String] = 
> builder.stream(Serdes.String, Serdes.String, "topicInput")
>     val reKeyedRecords: KStream[String, String] = 
> incomingRecords.selectKey((k, _) => k)
>     val joinedRecords: KStream[String, String] = 
> reKeyedRecords.leftJoin(kafkaTable, (s1: String, _: String) => s1)
>     joinedRecords.to(Serdes.String, Serdes.String, "topicOutput")
> This reliably generates the following error:
> [error] (StreamThread-1) java.lang.ClassCastException: java.lang.String 
> cannot be cast to [B
> java.lang.ClassCastException: java.lang.String cannot be cast to [B
>       at 
> org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:18)
>       at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:63)
>       at 
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:82)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
>       at 
> org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:44)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
>       at 
> org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:43)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:82)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:202)
>       at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:66)
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:180)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:436)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
> One caveat: I'm running this on a Confluent Platform 3.1.1 instance which 
> uses Kafka 0.10.1.0 since there is no newer Confluent Platform available. The 
> Kafka Streams project is built using "kafka-clients" and "kafka-streams" 
> version 0.10.1.1. If I use 0.10.1.0, I reliably hit bug 
> https://issues.apache.org/jira/browse/KAFKA-4355. I am not sure if there is 
> any issue using 0.10.1.1 libraries with a Confluent Platform running Kafka 
> 0.10.1.0. I will obviously try the next Confluent Platform binary when it is 
> available.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to