Sxnan commented on PR #23521:
URL: https://github.com/apache/flink/pull/23521#issuecomment-1813700885
> Hi @Sxnan! Sorry, I understand that I'm not a reviewer, but it happened
that I was testing functionality from this MR recently, and I found a bug (in
my opinion). It concerns the logic of `RecordAttributesValve`. There was very
little backlog data in my test at source + it had parallelism 4 (actually, any
value > 1 suits the case). Due to very short backlog phase, time interval
between sending `RecordAttributes(isBacklog=true)` and
`RecordAttributes(isBacklog=false)` was also very short. In addition, due to
the high parallelism one source subtask could send
`RecordAttributes(isBacklog=false)` even before
`RecordAttributes(isBacklog=true) `of another subtask. As a result, race
condition have occurred in `RecordAttributesValve#inputRecordAttributes`.
`backlogChannelsCnt` was incrementing and decrementing simultaneously, which
led to not reaching `numInputChannels`, so no RecordAttributes was emitted from
RecordAttributesValve at all.
>
> I suggest to have different counters for
`RecordAttributes(isBacklog=true)` and for `RecordAttributes(isBacklog=false)`.
Therefore, the race condition I mentioned earlier won't affect the result.
Something like this:
>
> ```
> if (recordAttributes.isBacklog()) {
> backlogChannelsCnt += 1;
> if (backlogChannelsCnt != numInputChannels) {
> return;
> }
> backlogChannelsCnt = 0;
> } else {
> nonBacklogChannelsCnt += 1;
> if (nonBacklogChannelsCnt != numInputChannels) {
> return;
> }
> nonBacklogChannelsCnt = 0;
> }
>
> if (lastOutputAttributes == null
> || lastOutputAttributes.isBacklog() !=
recordAttributes.isBacklog()) {
> if (lastOutputAttributes != null &&
!lastOutputAttributes.isBacklog()) {
> LOG.warn(
> "Switching from non-backlog to backlog is currently
not supported. Backlog status remains.");
> return;
> }
> lastOutputAttributes = recordAttributes;
> output.emitRecordAttributes(recordAttributes);
> }
> ```
>
> WDYT? Also, it is good to have a test for aforementioned case.
Hi @Smir. Thanks for trying this out! The RecordAttributesValve combines the
RecodAttributes from different input channels from the same input. The input is
considered in backlog state if and only if all the input channels are backlog =
true. Otherwise, some non-backlog records will be considered as backlog
records.
Back to your case, where there is very little backlog data and it has
parallelism greater than 1. It is possible that an input channel switches to
non-backlog state before other input channels switch to backlog state. When
that happens, we just process those data as if they are non-backlog data.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]