[ 
https://issues.apache.org/jira/browse/FLINK-24466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17449708#comment-17449708
 ] 

Shen Zhu commented on FLINK-24466:
----------------------------------

Hey [~slinkydeveloper] , thanks for writing the patch, it's very useful!

However, I have a question about filtering late events in 
{*}KeyedCoProcessOperatorWithWatermarkDelayAndFilterLateEvents{*}, based on the 
tests in {*}RowTimeIntervalJoinTest{*}, seems the late events will be cached

 
{code:java}
testHarness.processWatermark1(new Watermark(1));
testHarness.processWatermark2(new Watermark(1));
// Test late data.
testHarness.processElement1(insertRecord(1L, "k1"));
// Though (1L, "k1") is actually late, it will also be cached.
assertEquals(1, testHarness.numEventTimeTimers());
{code}
And if we filter the events in 
{*}KeyedCoProcessOperatorWithWatermarkDelayAndFilterLateEvents{*}, those events 
wouldn't be part of the join process, which will cause the following test cast 
to fail
{code:java}
List<Object> expectedOutput = new ArrayList<>();
expectedOutput.add(new Watermark(-19));
// This result is produced by the late row (1, "k1").
expectedOutput.add(insertRecord(1L, "k1", 2L, "k1"));
expectedOutput.add(insertRecord(2L, "k1", 2L, "k1"));
expectedOutput.add(insertRecord(5L, "k1", 2L, "k1"));
expectedOutput.add(insertRecord(5L, "k1", 15L, "k1"));
expectedOutput.add(new Watermark(0));
expectedOutput.add(insertRecord(35L, "k1", 15L, "k1"));
expectedOutput.add(new Watermark(18));
expectedOutput.add(insertRecord(40L, "k2", 39L, "k2"));
expectedOutput.add(new Watermark(41));{code}

Do you think we would update the path to respect such behavior?

Best,
Shen Zhu

 

> Interval Join late events handling behaviour is not consistent
> --------------------------------------------------------------
>
>                 Key: FLINK-24466
>                 URL: https://issues.apache.org/jira/browse/FLINK-24466
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Runtime
>            Reporter: Francesco Guardiani
>            Priority: Major
>         Attachments: Fix_late_events_filtering_for_interval_join.patch
>
>
> Interval Join handles late events emitting them in the output, as a padded 
> row. This behavior is also tested extensively in {{RowTimeIntervalJoinTest}}.
> The problem with this behavior is the way an event is considered "late" or 
> not: in order to distinguish between the two, {{RowTimeIntervalJoin}} uses 
> the {{ctx.timerService().currentWatermark()}} to find out if an event is 
> later than the last received watermark or not. But that method returns the 
> "combined" watermark across all the keys, partitions and *input streams*, 
> that is if one of the two streams goes "slower" than the other one, the 
> returned watermark is going to be the minimum among the two.
> This means that our late events handling effectively works only if the two 
> streams run "at the same pace", otherwise we'll just see what we consider 
> _late events_ for one of the two streams as joined.
> To observe this behavior, just run the test 
> {{IntervalJoinITCase#testRowTimeInnerJoinWithEquiTimeAttrs}} in this revision 
> https://github.com/apache/flink/commit/7033cbfe404bea1519d3342a611e2f92768d70f9
>  several times and you'll see that after a couple of runs it fails, joining 
> one of the {{"should-be-discarded"}} records. Those records are way behind 
> the watermark - 1 second, as defined.
> You'll find attached in the issue a small patch to show how this could be 
> fixed.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to