senegalo commented on a change in pull request #12056: URL: https://github.com/apache/flink/pull/12056#discussion_r446638002
########## File path: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java ########## @@ -239,25 +294,75 @@ public void run(SourceContext<OUT> ctx) throws Exception { } } - private class RMQCollector implements Collector<OUT> { - + /** + * Special collector for RMQ messages. + * Captures the correlation ID and delivery tag also does the filtering logic for weather a message has been + * processed or not. + */ + private class RMQCollectorImpl implements RMQDeserializationSchema.RMQCollector<OUT> { private final SourceContext<OUT> ctx; private boolean endOfStreamSignalled = false; + private long deliveryTag; + private Boolean preCheckFlag; - private RMQCollector(SourceContext<OUT> ctx) { + private RMQCollectorImpl(SourceContext<OUT> ctx) { this.ctx = ctx; } @Override public void collect(OUT record) { - if (endOfStreamSignalled || schema.isEndOfStream(record)) { - this.endOfStreamSignalled = true; + Preconditions.checkNotNull(preCheckFlag, "setCorrelationID must be called at least once before" + + "calling this method !"); + + if (!preCheckFlag) { return; } + if (isEndOfStream(record)) { + this.endOfStreamSignalled = true; + return; + } ctx.collect(record); } + public void collect(List<OUT> records) { + Preconditions.checkNotNull(preCheckFlag, "setCorrelationID must be called at least once before" + + "calling this method !"); + + if (!preCheckFlag) { + return; + } + + for (OUT record : records){ + if (isEndOfStream(record)) { + this.endOfStreamSignalled = true; + return; + } + ctx.collect(record); + } + } + + public void setMessageIdentifiers(String correlationId, long deliveryTag){ + preCheckFlag = true; + if (!autoAck) { + if (usesCorrelationId) { + Preconditions.checkNotNull(correlationId, "RabbitMQ source was instantiated " + + "with usesCorrelationId set to true yet we couldn't extract the correlation id from it !"); + if (!addId(correlationId)) { + // we have already processed this message + preCheckFlag = false; + } + } + sessionIds.add(deliveryTag); + } + } Review comment: That was actually my initial approach but here is what i faced that made me split the validation logic and write the code the way i did. Given that we also might be calling the `collect(List<T> record)` we would either have to: * copy paste the validation logic so it runs also is checked there. * make the `collect(List<T> record)` call the `collect(T record) method internally to add singular records but then the validation code will be called multiple times and yielding the same result. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org