[ 
https://issues.apache.org/jira/browse/KAFKA-14400?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17636907#comment-17636907
 ] 

Matthias J. Sax edited comment on KAFKA-14400 at 11/21/22 10:40 PM:
--------------------------------------------------------------------

The old and new API have different semantics. The old API computes left-join 
results eagerly, while the new API does not. Hence, the change is by design.

In the old API, when a left record is processed, we do a lookup into the right 
window and if no join partner is join emit a left-result record right away. 
However, this left-result record might be spurious: if later a right record 
does match within the same window, we get a later additional inner join result, 
and the original left-join result is incorrect.

The new API does it differently: when we don't find a join partner, we "buffer" 
the left record in an additional store (the KSTREAM-OUTERSHARED store) and we 
don't emit a left-join result at this point. If we get a later right input 
record that matches, we only emit the corresponding inner join result. If we 
never get a right record that matches, after grace period passed, we would emit 
the left-join record.

Cf 
[https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics]
 and https://issues.apache.org/jira/browse/KAFKA-10847 

Thus, to "trigger" the join, stream-time must advance such that the join window 
closes, ie, you need to send more data with larger timestamps into 
TopologyTestDriver to "flush" the left result.

I think we can close this as "not a bug" ?


was (Author: mjsax):
The old and new API have different semantics. The old API computes left-join 
results eagerly, while the new API does not. Hence, the change is by design.

In the old API, when a left record is processed, we do a lookup into the right 
window and if no join partner is join emit a left-result record right away. 
However, this left-result record might be spurious: if later a right record 
does match within the same window, we get a later additional inner join result, 
and the original left-join result is incorrect.

The new API does it differently: when we don't find a join partner, we "buffer" 
the left record in an additional store (the KSTREAM-OUTERSHARED store) and we 
don't emit a left-join result at this point. If we get a later right input 
record that matches, we only emit the corresponding inner join result. If we 
never get a right record that matches, after grace period passed, we would emit 
the left-join record.

Cf 
[https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics]
 and https://issues.apache.org/jira/browse/KAFKA-10847 

Thus, to "trigger" the join, stream-time must advance such that the join window 
closes, ie, you need to send more data with larger timestamps into 
TopologyTestDriver to "flush" the left result.

> KStream - KStream - LeftJoin() does not call ValueJoiner with null value 
> -------------------------------------------------------------------------
>
>                 Key: KAFKA-14400
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14400
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.1.1, 3.3.1
>         Environment: Windows PC 
>            Reporter: Victor van den Hoven
>            Priority: Major
>         Attachments: Afbeelding 2.png, SimpleStreamTopology.java, 
> SimpleStreamTopologyTest.java
>
>
> In Kafka-streams 3.1.1 :
> When using +JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(10000))+
> the  KStream {*}leftJoin{*}(KStream otherStream, ValueJoiner {_}joiner{_}, 
> JoinWindows windows) does not seem to call the _joiner_ with null value when 
> join predicate is not satisfied (not expected).
>  
> When using deprecated +JoinWindows.of(Duration.ofMillis(10000));+
> the  KStream {*}leftJoin{*}(KStream otherStream, ValueJoiner {_}joiner{_}, 
> JoinWindows windows) does
> call the _joiner_ with null value when join predicate is not satisfied (as 
> expected and documented).
>  
> Attached you can find two files with TopologyTestDriver Unit test to 
> reproduce.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to