Mykola Polonskyi created KAFKA-4270:
---------------------------------------
Summary: ClassCast for Agregation
Key: KAFKA-4270
URL: https://issues.apache.org/jira/browse/KAFKA-4270
Project: Kafka
Issue Type: Bug
Reporter: Mykola Polonskyi
With defined serdes for intermediate topic in aggregation catch the
ClassCastException: from custom class to the ByteArray.
In debug I saw that defined serde isn't used for creation sinkNode (incide
`org.apache.kafka.streams.kstream.internals.KGroupedTableImpl#doAggregate`)
Instead defined serde inside aggregation call is used default Impl with empty
plugs instead of implementations
{code:koltin}
userTable.join(
skicardsTable.groupBy { key, value ->
KeyValue(value.skicardInfo.ownerId, value.skicardInfo) }
.aggregate(
{ mutableSetOf<SkicardInfo>() },
{ ownerId, skicardInfo, accumulator ->
accumulator.put(skicardInfo) },
{ ownerId, skicardInfo, accumulator -> accumulator
},
skicardByOwnerIdSerde,
skicardByOwnerIdTopicName
),
{ userCreatedOrUpdated, skicardInfoSet ->
UserWithSkicardsCreatedOrUpdated(userCreatedOrUpdated.user, skicardInfoSet) }
).to(
userWithSkicardsTable
)
{code}
I think current behavior of `doAggregate` with serdes and/or stores setting up
should be changed because that is incorrect in release 0.10.0.1-cp1 to.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)