senegalo commented on a change in pull request #12056:
URL: https://github.com/apache/flink/pull/12056#discussion_r446638002



##########
File path: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
##########
@@ -239,25 +294,75 @@ public void run(SourceContext<OUT> ctx) throws Exception {
                }
        }
 
-       private class RMQCollector implements Collector<OUT> {
-
+       /**
+        * Special collector for RMQ messages.
+        * Captures the correlation ID and delivery tag also does the filtering 
logic for weather a message has been
+        * processed or not.
+        */
+       private class RMQCollectorImpl implements 
RMQDeserializationSchema.RMQCollector<OUT> {
                private final SourceContext<OUT> ctx;
                private boolean endOfStreamSignalled = false;
+               private long deliveryTag;
+               private Boolean preCheckFlag;
 
-               private RMQCollector(SourceContext<OUT> ctx) {
+               private RMQCollectorImpl(SourceContext<OUT> ctx) {
                        this.ctx = ctx;
                }
 
                @Override
                public void collect(OUT record) {
-                       if (endOfStreamSignalled || 
schema.isEndOfStream(record)) {
-                               this.endOfStreamSignalled = true;
+                       Preconditions.checkNotNull(preCheckFlag, 
"setCorrelationID must be called at least once before" +
+                               "calling this method !");
+
+                       if (!preCheckFlag) {
                                return;
                        }
 
+                       if (isEndOfStream(record)) {
+                               this.endOfStreamSignalled = true;
+                               return;
+                       }
                        ctx.collect(record);
                }
 
+               public void collect(List<OUT> records) {
+                       Preconditions.checkNotNull(preCheckFlag, 
"setCorrelationID must be called at least once before" +
+                               "calling this method !");
+
+                       if (!preCheckFlag) {
+                               return;
+                       }
+
+                       for (OUT record : records){
+                               if (isEndOfStream(record)) {
+                                       this.endOfStreamSignalled = true;
+                                       return;
+                               }
+                               ctx.collect(record);
+                       }
+               }
+
+               public void setMessageIdentifiers(String correlationId, long 
deliveryTag){
+                       preCheckFlag = true;
+                       if (!autoAck) {
+                               if (usesCorrelationId) {
+                                       
Preconditions.checkNotNull(correlationId, "RabbitMQ source was instantiated " +
+                                               "with usesCorrelationId set to 
true yet we couldn't extract the correlation id from it !");
+                                       if (!addId(correlationId)) {
+                                               // we have already processed 
this message
+                                               preCheckFlag = false;
+                                       }
+                               }
+                               sessionIds.add(deliveryTag);
+                       }
+               }

Review comment:
       That was actually my initial approach but here is what i faced that made 
me split the validation logic and write the code the way i did.
   
   Given that we also might be calling the `collect(List<T> record)` we would 
either have to:
   * copy paste the validation logic so it runs also is checked there.
   * make the `collect(List<T> record)` call the `collect(T record) method 
internally to add singular records but then the validation code will be called 
multiple times and yielding the same result.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to