senegalo commented on a change in pull request #12056:
URL: https://github.com/apache/flink/pull/12056#discussion_r504691988
##########
File path:
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
##########
@@ -268,54 +304,141 @@ public void run(SourceContext<OUT> ctx) throws Exception
{
}
}
- private class RMQCollector implements Collector<OUT> {
+ @Override
+ public void cancel() {
+ running = false;
+ }
+
+ @Override
+ protected void acknowledgeSessionIDs(List<Long> sessionIds) {
+ try {
+ for (long id : sessionIds) {
+ channel.basicAck(id, false);
+ }
+ channel.txCommit();
+ } catch (IOException e) {
+ throw new RuntimeException("Messages could not be
acknowledged during checkpoint creation.", e);
+ }
+ }
+
+ @Override
+ public TypeInformation<OUT> getProducedType() {
+ return deliveryDeserializer.getProducedType();
+ }
+ /**
+ * Special collector for RMQ messages.
+ * Captures the correlation ID and delivery tag also does the filtering
logic for weather a message has been
+ * processed or not.
+ */
+ private class RMQCollectorImpl implements
RMQDeserializationSchema.RMQCollector<OUT> {
Review comment:
i think it's fine it's just missing the reset method to reset the
`customIdentifiersSet, correlationId and deliveryTag` whenever a new message
is delivered.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]