[
https://issues.apache.org/jira/browse/KAFKA-4270?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Guozhang Wang updated KAFKA-4270:
---------------------------------
Labels: architecture (was: )
> ClassCast for Agregation
> ------------------------
>
> Key: KAFKA-4270
> URL: https://issues.apache.org/jira/browse/KAFKA-4270
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Reporter: Mykola Polonskyi
> Labels: architecture
>
> With defined serdes for intermediate topic in aggregation catch the
> ClassCastException: from custom class to the ByteArray.
> In debug I saw that defined serde isn't used for creation sinkNode (incide
> `org.apache.kafka.streams.kstream.internals.KGroupedTableImpl#doAggregate`)
> Instead defined serde inside aggregation call is used default Impl with empty
> plugs instead of implementations
> {code:koltin}
> userTable.join(
> skicardsTable.groupBy { key, value ->
> KeyValue(value.skicardInfo.ownerId, value.skicardInfo) }
> .aggregate(
> { mutableSetOf<SkicardInfo>() },
> { ownerId, skicardInfo, accumulator ->
> accumulator.put(skicardInfo) },
> { ownerId, skicardInfo, accumulator ->
> accumulator },
> skicardByOwnerIdSerde,
> skicardByOwnerIdTopicName
> ),
> { userCreatedOrUpdated, skicardInfoSet ->
> UserWithSkicardsCreatedOrUpdated(userCreatedOrUpdated.user, skicardInfoSet) }
> ).to(
> userWithSkicardsTable
> )
> {code}
> I think current behavior of `doAggregate` with serdes and/or stores setting
> up should be changed because that is incorrect in release 0.10.0.1-cp1 to.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)