Ayoub Omari created KAFKA-16394:
-----------------------------------

             Summary: ForeignKey LEFT join propagates null value on foreignKey 
change
                 Key: KAFKA-16394
                 URL: https://issues.apache.org/jira/browse/KAFKA-16394
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 3.7.0
            Reporter: Ayoub Omari
         Attachments: ForeignJoinTest.scala, JsonSerde.scala

We have two topics : _left-topic[String, LeftRecord]_ and _right-topic[String, 
String]_

where _LeftRecord_ :
{code:scala}
 case class LeftRecord(foreignKey: String, name: String){code}
we do a simple foreign key join on left-topic's foreignKey field which returns 
the value in right-topic.

 

+*Scenario1: change foreignKey*+

Input the following
{code:scala}
rightTopic.pipeInput("fk1", "1")
rightTopic.pipeInput("fk2", "2") 

leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1"))
leftTopic.pipeInput("pk1", LeftRecord("fk2", "pk1"))
{code}
 

*+Expected result+*
{code:scala}
KeyValue(pk1, 1)
KeyValue(pk1, 2){code}
 

*+Actual result+*
{code:scala}
KeyValue(pk1, 1)
KeyValue(pk1, null)
KeyValue(pk1, 2){code}
 

A null is propagated to the join result when the foreign key changes

 

+*Scenario 2: Delete PrimaryKey*+

Input
{code:scala}
rightTopic.pipeInput("fk1", "1")
rightTopic.pipeInput("fk2", "2")

leftTopic.pipeInput("pk1", LeftRecord("fk1", "pk1"))
leftTopic.pipeInput("pk1", null) {code}
 

*+Expected result+*
{code:scala}
KeyValue(pk1, 1)
KeyValue(pk1, null) {code}
 

*+Actual result+*
{code:java}
KeyValue(pk1, 1)
KeyValue(pk1, null)
KeyValue(pk1, null) {code}
An additional null is propagated to the join result.

 

This bug doesn't exist on versions 3.6.0 and below.

 

I believe the issue comes from the line 
[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionSendProcessorSupplier.java#L134]

where we propagate the deletion in the two scenarios above

 

Attaching the topology I used.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to