senegalo commented on a change in pull request #12056:
URL: https://github.com/apache/flink/pull/12056#discussion_r504691988



##########
File path: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
##########
@@ -268,54 +304,141 @@ public void run(SourceContext<OUT> ctx) throws Exception 
{
                }
        }
 
-       private class RMQCollector implements Collector<OUT> {
+       @Override
+       public void cancel() {
+               running = false;
+       }
+
+       @Override
+       protected void acknowledgeSessionIDs(List<Long> sessionIds) {
+               try {
+                       for (long id : sessionIds) {
+                               channel.basicAck(id, false);
+                       }
+                       channel.txCommit();
+               } catch (IOException e) {
+                       throw new RuntimeException("Messages could not be 
acknowledged during checkpoint creation.", e);
+               }
+       }
+
+       @Override
+       public TypeInformation<OUT> getProducedType() {
+               return deliveryDeserializer.getProducedType();
+       }
 
+       /**
+        * Special collector for RMQ messages.
+        * Captures the correlation ID and delivery tag also does the filtering 
logic for weather a message has been
+        * processed or not.
+        */
+       private class RMQCollectorImpl implements 
RMQDeserializationSchema.RMQCollector<OUT> {

Review comment:
       i think it's fine it's just missing the reset method to reset the 
`customIdentifiersSet, correlationId and deliveryTag` whenever  a new message 
is delivered.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to