[
https://issues.apache.org/jira/browse/FLINK-24466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17449708#comment-17449708
]
Shen Zhu commented on FLINK-24466:
----------------------------------
Hey [~slinkydeveloper] , thanks for writing the patch, it's very useful!
However, I have a question about filtering late events in
{*}KeyedCoProcessOperatorWithWatermarkDelayAndFilterLateEvents{*}, based on the
tests in {*}RowTimeIntervalJoinTest{*}, seems the late events will be cached
{code:java}
testHarness.processWatermark1(new Watermark(1));
testHarness.processWatermark2(new Watermark(1));
// Test late data.
testHarness.processElement1(insertRecord(1L, "k1"));
// Though (1L, "k1") is actually late, it will also be cached.
assertEquals(1, testHarness.numEventTimeTimers());
{code}
And if we filter the events in
{*}KeyedCoProcessOperatorWithWatermarkDelayAndFilterLateEvents{*}, those events
wouldn't be part of the join process, which will cause the following test cast
to fail
{code:java}
List<Object> expectedOutput = new ArrayList<>();
expectedOutput.add(new Watermark(-19));
// This result is produced by the late row (1, "k1").
expectedOutput.add(insertRecord(1L, "k1", 2L, "k1"));
expectedOutput.add(insertRecord(2L, "k1", 2L, "k1"));
expectedOutput.add(insertRecord(5L, "k1", 2L, "k1"));
expectedOutput.add(insertRecord(5L, "k1", 15L, "k1"));
expectedOutput.add(new Watermark(0));
expectedOutput.add(insertRecord(35L, "k1", 15L, "k1"));
expectedOutput.add(new Watermark(18));
expectedOutput.add(insertRecord(40L, "k2", 39L, "k2"));
expectedOutput.add(new Watermark(41));{code}
Do you think we would update the path to respect such behavior?
Best,
Shen Zhu
> Interval Join late events handling behaviour is not consistent
> --------------------------------------------------------------
>
> Key: FLINK-24466
> URL: https://issues.apache.org/jira/browse/FLINK-24466
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Runtime
> Reporter: Francesco Guardiani
> Priority: Major
> Attachments: Fix_late_events_filtering_for_interval_join.patch
>
>
> Interval Join handles late events emitting them in the output, as a padded
> row. This behavior is also tested extensively in {{RowTimeIntervalJoinTest}}.
> The problem with this behavior is the way an event is considered "late" or
> not: in order to distinguish between the two, {{RowTimeIntervalJoin}} uses
> the {{ctx.timerService().currentWatermark()}} to find out if an event is
> later than the last received watermark or not. But that method returns the
> "combined" watermark across all the keys, partitions and *input streams*,
> that is if one of the two streams goes "slower" than the other one, the
> returned watermark is going to be the minimum among the two.
> This means that our late events handling effectively works only if the two
> streams run "at the same pace", otherwise we'll just see what we consider
> _late events_ for one of the two streams as joined.
> To observe this behavior, just run the test
> {{IntervalJoinITCase#testRowTimeInnerJoinWithEquiTimeAttrs}} in this revision
> https://github.com/apache/flink/commit/7033cbfe404bea1519d3342a611e2f92768d70f9
> several times and you'll see that after a couple of runs it fails, joining
> one of the {{"should-be-discarded"}} records. Those records are way behind
> the watermark - 1 second, as defined.
> You'll find attached in the issue a small patch to show how this could be
> fixed.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)