cmick commented on pull request #16472: URL: https://github.com/apache/flink/pull/16472#issuecomment-887052378
@fapaul I've added 2 integration tests in 7094db8: - `testMessageDelivery` - simple test in which we verify that the source actually consumes the messages - `testAckFailure` - test reproducing the issue described in FLINK-23183 (the worst possible case actually). Exactly-once mode is enabled here (checkpointing and correlationIds) and the source always fails on acknowledgement (I extended this part with custom implementation). This test fails on the current master What happens exactly, step by step: 1. The source consumes N messages (here N=5 due to prefetchCount set) 2. Checkpoint is triggered and completes, all correlationIds are stored in the checkpoint 3. The source starts the ACK procedure. This may always fail for instance due to connection error. And when it does (here always) we have a problem 4. The job restarts from the last checkpoint 5. RabbitMQ redelivers all N messages (as no ACK was sent) and, after receiving them, the source stops consuming due to prefetchCount set 6. If the redelivered messages are just ignored (as on current master), the job gets stuck and will never process new messages. If the ACK is sent (basicReject in this PR), the transaction is committed on the next checkpoint, and the source resumes normal consume afterwards So the problem is that the previous implementation (on master) does not send ACK with corresponding `deliveryTag` for some of the messages (it should for all off them). It just ignores them. The initial idea was just to put all message deliveryTags (redelivered or not) into the checkpointed state. This would work, but the proposed solution will have better performance (less state operations) and is semantically correct. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
