senegalo commented on a change in pull request #12056:
URL: https://github.com/apache/flink/pull/12056#discussion_r446636112



##########
File path: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
##########
@@ -208,29 +252,40 @@ public void close() throws Exception {
                }
        }
 
+       /**
+        * Parse and collects the body of the an AMQP message.
+        *
+        * <p>If any of the constructors with the {@link DeserializationSchema} 
class was used to construct the source
+        * it uses the {@link DeserializationSchema#deserialize(byte[])} to 
parse the body of the AMQP message.
+        *
+        * <p>If any of the constructors with the {@link 
RMQDeserializationSchema } class was used to construct the source it uses the
+        * {@link RMQDeserializationSchema#processMessage(Envelope, 
AMQP.BasicProperties, byte[], RMQDeserializationSchema.RMQCollector collector)}
+        * method of that provided instance.
+        *
+        * @param delivery the AMQP {@link QueueingConsumer.Delivery}
+        * @param collector a {@link RMQCollectorImpl} to collect the data
+        * @throws IOException
+        */
+       protected void processMessage(QueueingConsumer.Delivery delivery, 
RMQDeserializationSchema.RMQCollector collector) throws IOException {
+               AMQP.BasicProperties properties = delivery.getProperties();
+               byte[] body = delivery.getBody();
+
+               if (deliveryDeserializer != null){
+                       Envelope envelope = delivery.getEnvelope();
+                       deliveryDeserializer.processMessage(envelope, 
properties, body, collector);
+               } else {
+                       
collector.setMessageIdentifiers(properties.getCorrelationId(), 
delivery.getEnvelope().getDeliveryTag());
+                       collector.collect(schema.deserialize(body));
+               }
+       }

Review comment:
       actually after your awesome suggestion of wrapping the deserilaztion 
schema into the new interface this method is now:
   
   ```
                AMQP.BasicProperties properties = delivery.getProperties();
                byte[] body = delivery.getBody();
                Envelope envelope = delivery.getEnvelope();
   
                deliveryDeserializer.deserialize(envelope, properties, body, 
collector);
   ```
   Hence my comment on top "Elegant Solution !"




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to