senegalo commented on a change in pull request #12056:
URL: https://github.com/apache/flink/pull/12056#discussion_r499228712
##########
File path:
flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
##########
@@ -219,7 +221,7 @@ public void testCheckpointing() throws Exception {
assertEquals(numIds, messageIds.size());
if (messageIds.size() > 0) {
-
assertTrue(messageIds.contains(Long.toString(lastSnapshotId)));
+
assertTrue(messageIds.contains(Long.toString(lastSnapshotId - 1)));
Review comment:
I spent 2 hours tracking down the answer of this one as i completely
forgot the answer ! i had to check the operator state stored and retrieved
between upstream/master and origin ! it was a fun ride !
Here is the answer:
the `messageId` gets incremented every time the `Envelope.getDeliveryTag` is
called from the mocked `Envelope`
https://github.com/apache/flink/blob/b9c536699ab90573a07e283f815e77ee60e03143/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java#L607-L612
`messageId` is also used as the `correlationID`
https://github.com/apache/flink/blob/b9c536699ab90573a07e283f815e77ee60e03143/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java#L618-L623
so if you get the `correlationID` before `getDeliveryTag` you get
`correlationID: 0, deliveryTag: 1` and the opposite you get `correlationID: 1,
deliveryTag: 1`
in the [previous
code](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java#L248-L250)
`getDeliveryTag` was called before `getCorrelationID` resulting in the state
stored
[here](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java#L254)
to start with `1` which is later retrieved and checked in the
[testCheckpointing](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java#L213-L223)
for it's presence
in the new code the `getDeliveryTag` is called after `getCorrelationId`
https://github.com/apache/flink/blob/b9c536699ab90573a07e283f815e77ee60e03143/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java#L286
which would make the state stored starts from `0` instead of `1`.
This is why i am subtracting `1` from the value as we're starting the list
from `0` not `1`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]