dawidwys commented on pull request #12056:
URL: https://github.com/apache/flink/pull/12056#issuecomment-633982361


   Hmm I missed the filtering logic... 
   This makes it a bit harder as it implies additional contract on the 
`Collector` if we separate emitting records from correlationId assignment to 
sth like:
   ```
   interface RMQCollector<T> {
       void collect(T record);
       void setCorrelationId(String correlationId); // throw exception if 
called multiple times
   }
   ```
   Then we force users to call the `setCorrelationId` first before the 
`collect`.
   
   In that case I would think of a slightly different interface:
   ```
   interface RMQCollector<T> {
       void collect(T record); // uses the default correlation from properties
       void collect(String correlationId, T record);
   }
   ```
   and then as you said we would have to move the filtering logic into the 
collector. Additionally we would have to rework it slightly so that 
incorporates that the `RMQCollector#collect()` can be called multiple times 
with the same `correlationId` in a single pass `deserialize`. This should be 
doable though. What do you think?
   
   As for the `isEndOfStream` I think its not too hard to support it to keep 
the same functionality as it was before (even if very fragile in regards to 
checkpointing the last checkpoint etc.).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to