senegalo commented on a change in pull request #12056:
URL: https://github.com/apache/flink/pull/12056#discussion_r499228712



##########
File path: 
flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
##########
@@ -219,7 +221,7 @@ public void testCheckpointing() throws Exception {
 
                        assertEquals(numIds, messageIds.size());
                        if (messageIds.size() > 0) {
-                               
assertTrue(messageIds.contains(Long.toString(lastSnapshotId)));
+                               
assertTrue(messageIds.contains(Long.toString(lastSnapshotId - 1)));

Review comment:
       I spent 2 hours tracking down the answer of this one as i completely 
forgot the answer ! i have to check the operator state stored and retrieved 
between upstream/master and origin ! it was a fun ride !
   Here is the answer:
   
   the `messageId` gets incremented every time the `Envelope.getDeliveryTag` is 
called from the mocked `Envelope` 
https://github.com/apache/flink/blob/b9c536699ab90573a07e283f815e77ee60e03143/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java#L607-L612
   
   `messageId` is also used as the `correlationID` 
https://github.com/apache/flink/blob/b9c536699ab90573a07e283f815e77ee60e03143/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java#L618-L623
   
   so if you get the `correlationID` before `getDeliveryTag` you get 
`correlationID: 0, deliveryTag: 1` and the opposite you get `correlationID: 1, 
deliveryTag: 1`
   
   in the [previous 
code](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java#L248-L250)
 `getDeliveryTag` was called before `getCorrelationID` resulting in the state 
stored 
[here](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java#L254)
 to start with `1` which is later retrieved and checked in the 
[testCheckpointing](https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java#L213-L223)
 for it's presence 
   
   in the new code the `getDeliveryTag` is called after `getCorrelationId` 
https://github.com/apache/flink/blob/b9c536699ab90573a07e283f815e77ee60e03143/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java#L286
 which would make the state stored starts from `0` instead of `1`. 
   
   This is why i am subtracting `1` from the value as we're starting the list 
from `0` not `1`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to