Aleksandr Sorokoumov created KAFKA-13769:
--------------------------------------------

             Summary: KTable FK join can miss records if an upstream 
non-key-changing operation changes key serializer
                 Key: KAFKA-13769
                 URL: https://issues.apache.org/jira/browse/KAFKA-13769
             Project: Kafka
          Issue Type: Bug
          Components: streams
            Reporter: Aleksandr Sorokoumov


Consider a topology, where the source KTable is followed by a 
{{transformValues}} operation [that changes the key 
schema|https://github.com/apache/kafka/blob/db724f23f38cdb6c668a10681ea2a03bb11611ad/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L452]
 followed by a foreign key join. The FK join might miss records in such a 
topology because they might be sent to the wrong partitions.

As {{transformValues}} does not change the key itself, repartition won't happen 
after this operation. However, the KTable instance that calls 
{{doJoinOnForeignKey}} uses the new serde coming from {{transformValues}} 
rather than the original. As a result, all nodes in the FK join topology except 
for 
[SubscriptionResolverJoinProcessorSupplier|https://github.com/apache/kafka/blob/db724f23f38cdb6c668a10681ea2a03bb11611ad/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L1225-L1232]
 use the "new" serde. {{SubscriptionResolverJoinProcessorSupplier}} uses the 
old one because it uses 
[valueGetterSupplier|https://github.com/apache/kafka/blob/db724f23f38cdb6c668a10681ea2a03bb11611ad/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L1225]
 that in turn will retrieve the records from the topic.

A different serializer might serialize keys to different series of bytes, which 
will lead to sending them to the wrong partitions. To run into that issue, 
multiple things must happen:
* a topic should have more than one partition,
* KTable's serializer should be modified via a non-key-changing operation,
* the new serializer should serialize keys differently

In practice, it might happen if the key type is a {{Struct}} because it 
serializes to a JSON string {{columnName -> value}}. If the {{transformValues}} 
operation changes column names to avoid name clashes with the joining table, 
such join can lead to incorrect behavior.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to