dawidwys commented on pull request #12056:
URL: https://github.com/apache/flink/pull/12056#issuecomment-633982361
Hmm I missed the filtering logic...
This makes it a bit harder as it implies additional contract on the
`Collector` if we separate emitting records from correlationId assignment to
sth like:
```
interface RMQCollector<T> {
void collect(T record);
void setCorrelationId(String correlationId); // throw exception if
called multiple times
}
```
Then we force users to call the `setCorrelationId` first before the
`collect`.
In that case I would think of a slightly different interface:
```
interface RMQCollector<T> {
void collect(T record); // uses the default correlation from properties
void collect(String correlationId, T record);
}
```
and then as you said we would have to move the filtering logic into the
collector. Additionally we would have to rework it slightly so that
incorporates that the `RMQCollector#collect()` can be called multiple times
with the same `correlationId` in a single pass `deserialize`. This should be
doable though. What do you think?
As for the `isEndOfStream` I think its not too hard to support it to keep
the same functionality as it was before (even if very fragile in regards to
checkpointing the last checkpoint etc.).
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]