austince commented on a change in pull request #12056:
URL: https://github.com/apache/flink/pull/12056#discussion_r440230648
##########
File path:
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
##########
@@ -208,29 +252,40 @@ public void close() throws Exception {
}
}
+ /**
+ * Parse and collects the body of the an AMQP message.
+ *
+ * <p>If any of the constructors with the {@link DeserializationSchema}
class was used to construct the source
+ * it uses the {@link DeserializationSchema#deserialize(byte[])} to
parse the body of the AMQP message.
+ *
+ * <p>If any of the constructors with the {@link
RMQDeserializationSchema } class was used to construct the source it uses the
+ * {@link RMQDeserializationSchema#processMessage(Envelope,
AMQP.BasicProperties, byte[], RMQDeserializationSchema.RMQCollector collector)}
+ * method of that provided instance.
+ *
+ * @param delivery the AMQP {@link QueueingConsumer.Delivery}
+ * @param collector a {@link RMQCollectorImpl} to collect the data
+ * @throws IOException
+ */
+ protected void processMessage(QueueingConsumer.Delivery delivery,
RMQDeserializationSchema.RMQCollector collector) throws IOException {
+ AMQP.BasicProperties properties = delivery.getProperties();
+ byte[] body = delivery.getBody();
+
+ if (deliveryDeserializer != null){
+ Envelope envelope = delivery.getEnvelope();
+ deliveryDeserializer.processMessage(envelope,
properties, body, collector);
+ } else {
+
collector.setMessageIdentifiers(properties.getCorrelationId(),
delivery.getEnvelope().getDeliveryTag());
+ collector.collect(schema.deserialize(body));
+ }
+ }
Review comment:
If we wrap the schema and go with the `reset` method, this could be
simplified to:
```java
void processMessage(QueueingConsumer.Delivery delivery,
RMQDeserializationSchema.RMQCollector<OUT> collector) throws IOException {
AMQP.BasicProperties properties = delivery.getProperties();
byte[] body = delivery.getBody();
Envelope envelope = delivery.getEnvelope();
// ensure the collector knows a new message is being processed
collector.reset();
schema.deserialize(envelope, properties, body, collector);
}
```
We may also want to keep this private until there is a use case for
overriding it?
##########
File path:
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
##########
@@ -208,29 +252,40 @@ public void close() throws Exception {
}
}
+ /**
+ * Parse and collects the body of the an AMQP message.
+ *
+ * <p>If any of the constructors with the {@link DeserializationSchema}
class was used to construct the source
+ * it uses the {@link DeserializationSchema#deserialize(byte[])} to
parse the body of the AMQP message.
+ *
+ * <p>If any of the constructors with the {@link
RMQDeserializationSchema } class was used to construct the source it uses the
+ * {@link RMQDeserializationSchema#processMessage(Envelope,
AMQP.BasicProperties, byte[], RMQDeserializationSchema.RMQCollector collector)}
+ * method of that provided instance.
+ *
+ * @param delivery the AMQP {@link QueueingConsumer.Delivery}
+ * @param collector a {@link RMQCollectorImpl} to collect the data
+ * @throws IOException
+ */
+ protected void processMessage(QueueingConsumer.Delivery delivery,
RMQDeserializationSchema.RMQCollector collector) throws IOException {
+ AMQP.BasicProperties properties = delivery.getProperties();
+ byte[] body = delivery.getBody();
+
+ if (deliveryDeserializer != null){
+ Envelope envelope = delivery.getEnvelope();
+ deliveryDeserializer.processMessage(envelope,
properties, body, collector);
+ } else {
+
collector.setMessageIdentifiers(properties.getCorrelationId(),
delivery.getEnvelope().getDeliveryTag());
+ collector.collect(schema.deserialize(body));
+ }
+ }
Review comment:
If we wrap the schema and go with the `reset` method, this could be
simplified to:
```java
void processMessage(QueueingConsumer.Delivery delivery,
RMQDeserializationSchema.RMQCollector<OUT> collector) throws IOException {
AMQP.BasicProperties properties = delivery.getProperties();
byte[] body = delivery.getBody();
Envelope envelope = delivery.getEnvelope();
// ensure the collector knows a new message is being processed
collector.reset();
schema.deserialize(envelope, properties, body, collector);
}
```
We may also want to keep this package-private until there is a use case for
overriding it?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]