Github user fhueske commented on the issue:
https://github.com/apache/flink/pull/5094
Yes, the records emitted in your test are late. But the reason for that is
that also records that you give into the join are already late. You set the
watermarks on both input to `6000` and subsequently insert late data.
If you change the test to insert records such that they are not late:
```
testHarness.processElement1(new StreamRecord[CRow](
CRow(Row.of(1000L: JLong, "k1"), true), 1000))
testHarness.processWatermark1(new Watermark(5999))
testHarness.processWatermark2(new Watermark(5999))
testHarness.processElement1(new StreamRecord[CRow](
CRow(Row.of(6000L: JLong, "k1"), true), 6000))
testHarness.processElement2(new StreamRecord[CRow](
CRow(Row.of(6000L: JLong, "k1"), true), 6000))
```
the result records are not late as well.
If you change the watemarks to `6000`, the 2nd and 3rd records are received
as late data (because their timestamp is equal to the watermark time) and
therefore the output is late as well.
I think the current logic is correct.
---