dawidwys commented on pull request #12056:
URL: https://github.com/apache/flink/pull/12056#issuecomment-633433806


   Hey,
   Thanks for working on this. I was wondering if we could/should replace the 
`RMQDeserializedMessage` with an emitter parameter pattern 
(https://www.morling.dev/blog/emitter-parameter-pattern-for-flexible-spis/).
   
   If we changed the 
`org.apache.flink.streaming.connectors.rabbitmq.RMQDeserializationSchema#processMessage`
 to 
   
   ```
   void processMessage(
       Envelope envelope, 
       AMQP.BasicProperties properties, 
       byte[] body,
       RMQCollector<T> collector
   ) throws IOException;
   
   interface RMQCollector<T> {
       void collect(T record);
       void setCorrelationId(String correlationId); // throw exception if 
called multiple times
   }
   ```
   
   I think we could wrap the `DeserializationSchema` directly in that interface 
(similar as in other connectors).
   
   The biggest benefit is that we do not need to buffer the elements in arrays, 
thus we can create less objects on the hot path. There is the downside that the 
contract on `setCorrelationId`, that it is called once is not strictly 
enforced, but I think it should be fine here.
   
   The remaining question is if we want to support the `isEndOfStream` in the 
new schema. If we do not and go with the wrapper approach, we would have to 
find a workaround for it. At the same time this would be a regression compared 
to the current schema. Even if that method has a couple of problems on its own.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to