Aleksandr Sorokoumov created KAFKA-13769:
--------------------------------------------
Summary: KTable FK join can miss records if an upstream
non-key-changing operation changes key serializer
Key: KAFKA-13769
URL: https://issues.apache.org/jira/browse/KAFKA-13769
Project: Kafka
Issue Type: Bug
Components: streams
Reporter: Aleksandr Sorokoumov
Consider a topology, where the source KTable is followed by a
{{transformValues}} operation [that changes the key
schema|https://github.com/apache/kafka/blob/db724f23f38cdb6c668a10681ea2a03bb11611ad/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L452]
followed by a foreign key join. The FK join might miss records in such a
topology because they might be sent to the wrong partitions.
As {{transformValues}} does not change the key itself, repartition won't happen
after this operation. However, the KTable instance that calls
{{doJoinOnForeignKey}} uses the new serde coming from {{transformValues}}
rather than the original. As a result, all nodes in the FK join topology except
for
[SubscriptionResolverJoinProcessorSupplier|https://github.com/apache/kafka/blob/db724f23f38cdb6c668a10681ea2a03bb11611ad/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L1225-L1232]
use the "new" serde. {{SubscriptionResolverJoinProcessorSupplier}} uses the
old one because it uses
[valueGetterSupplier|https://github.com/apache/kafka/blob/db724f23f38cdb6c668a10681ea2a03bb11611ad/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L1225]
that in turn will retrieve the records from the topic.
A different serializer might serialize keys to different series of bytes, which
will lead to sending them to the wrong partitions. To run into that issue,
multiple things must happen:
* a topic should have more than one partition,
* KTable's serializer should be modified via a non-key-changing operation,
* the new serializer should serialize keys differently
In practice, it might happen if the key type is a {{Struct}} because it
serializes to a JSON string {{columnName -> value}}. If the {{transformValues}}
operation changes column names to avoid name clashes with the joining table,
such join can lead to incorrect behavior.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)