austince commented on a change in pull request #12056:
URL: https://github.com/apache/flink/pull/12056#discussion_r440170857



##########
File path: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQDeserializationSchema.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.util.Collector;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Envelope;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Interface for the set of methods required to parse an RMQ delivery.
+ * @param <T> The output type of the {@link RMQSource}
+ */
+public interface RMQDeserializationSchema<T> extends Serializable, 
ResultTypeQueryable<T> {
+       /**
+        * This method takes all the RabbitMQ delivery information supplied by 
the client extract the data and pass it to the
+        * collector.
+        * NOTICE: The implementation of this method MUST call {@link 
RMQCollector#setMessageIdentifiers(String, long)} with
+        * the correlation ID of the message if checkpointing and 
UseCorrelationID (in the RMQSource constructor) were enabled
+        * the {@link RMQSource}.
+        * @param envelope
+        * @param properties
+        * @param body
+        * @throws IOException
+        */
+       public  void processMessage(Envelope envelope, AMQP.BasicProperties 
properties, byte[] body, RMQCollector collector) throws IOException;

Review comment:
       I think this might be nicer as `deserialize`, to fit with other 
Deserialization Schema patterns.

##########
File path: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQDeserializationSchema.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.util.Collector;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Envelope;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Interface for the set of methods required to parse an RMQ delivery.
+ * @param <T> The output type of the {@link RMQSource}
+ */
+public interface RMQDeserializationSchema<T> extends Serializable, 
ResultTypeQueryable<T> {
+       /**
+        * This method takes all the RabbitMQ delivery information supplied by 
the client extract the data and pass it to the
+        * collector.
+        * NOTICE: The implementation of this method MUST call {@link 
RMQCollector#setMessageIdentifiers(String, long)} with
+        * the correlation ID of the message if checkpointing and 
UseCorrelationID (in the RMQSource constructor) were enabled
+        * the {@link RMQSource}.
+        * @param envelope
+        * @param properties
+        * @param body
+        * @throws IOException
+        */
+       public  void processMessage(Envelope envelope, AMQP.BasicProperties 
properties, byte[] body, RMQCollector collector) throws IOException;
+
+       /**
+        * Method to decide whether the element signals the end of the stream. 
If
+        * true is returned the element won't be emitted.
+        *
+        * @param nextElement The element to test for the end-of-stream signal.
+        * @return True, if the element signals end of stream, false otherwise.
+        */
+       public boolean isEndOfStream(T nextElement);
+
+       /**
+        * The {@link TypeInformation} for the deserialized T.
+        * As an example the proper implementation of this method if T is a 
String is:
+        * {@code return TypeExtractor.getForClass(String.class)}
+        * @return TypeInformation
+        */
+       public TypeInformation<T> getProducedType();
+
+
+       /**
+        * Special collector for RMQ messages.
+        * Captures the correlation ID and delivery tag also does the filtering 
logic for weather a message has been
+        * processed or not.
+        * @param <T>
+        */
+       public interface RMQCollector<T> extends Collector<T> {
+               public void collect(List<T> records);
+
+               public void setMessageIdentifiers(String correlationId, long 
deliveryTag);
+       }

Review comment:
       What do you think about this interface instead?
   
   ```java
   
   interface RMQCollector<T> extends Collector<T> {
                /**
                 * Called before a new message is processed in order to clear 
any temporary state.
                 */
                default void reset() {}
   
                void collect(String correlationId, long deliveryTag, T record);
   
                void collect(T record);
        }
   ```
   
   Where instead of having the user call `setMessageIdentifiers`, we can call 
this ourselves from the implementation. We can also ensure the same 
`correlationId` and `deliveryTag` are used for a message by storing them in 
local state, and clear them when the next message comes with the `reset` 
method. This would follow the emitter pattern of other collectors a bit more 
and still allow users to call `collect` multiple times per message.

##########
File path: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
##########
@@ -77,6 +78,7 @@
        private final RMQConnectionConfig rmqConnectionConfig;
        protected final String queueName;
        private final boolean usesCorrelationId;
+       protected RMQDeserializationSchema<OUT> deliveryDeserializer;
        protected DeserializationSchema<OUT> schema;

Review comment:
       I think it would be nice if we always used the more-specific 
`RMQDeserializationSchema`, and wrapped the standard `DeserializationSchema` 
when passed in the constructor. This would allow us to remove the null checks/ 
ternaries. We could wrap it like this:
   
   ```java
   static <T> RMQDeserializationSchema<T> 
wrapDeserializationSchema(DeserializationSchema<T> schema) {
                return new RMQDeserializationSchema<>() {
                        @Override
                        public void deserialize(Envelope envelope, 
AMQP.BasicProperties properties, byte[] body, RMQCollector<T> collector) throws 
IOException {
                                final long deliveryTag = 
envelope.getDeliveryTag();
                                final String correlationId = 
properties.getCorrelationId();
                                collector.collect(correlationId, deliveryTag, 
schema.deserialize(body));
                        }
   
                        @Override
                        public TypeInformation<T> getProducedType() {
                                return schema.getProducedType();
                        }
   
                        @Override
                        public boolean isEndOfStream(T nextElement) {
                                return schema.isEndOfStream(nextElement);
                        }
   
                        @Override
                        public void 
open(DeserializationSchema.InitializationContext context) throws Exception {
                                schema.open(context);
                        }
                };
        }
   
   ```

##########
File path: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQDeserializationSchema.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.util.Collector;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Envelope;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Interface for the set of methods required to parse an RMQ delivery.
+ * @param <T> The output type of the {@link RMQSource}
+ */
+public interface RMQDeserializationSchema<T> extends Serializable, 
ResultTypeQueryable<T> {
+       /**
+        * This method takes all the RabbitMQ delivery information supplied by 
the client extract the data and pass it to the
+        * collector.
+        * NOTICE: The implementation of this method MUST call {@link 
RMQCollector#setMessageIdentifiers(String, long)} with
+        * the correlation ID of the message if checkpointing and 
UseCorrelationID (in the RMQSource constructor) were enabled
+        * the {@link RMQSource}.
+        * @param envelope
+        * @param properties
+        * @param body
+        * @throws IOException
+        */
+       public  void processMessage(Envelope envelope, AMQP.BasicProperties 
properties, byte[] body, RMQCollector collector) throws IOException;
+
+       /**
+        * Method to decide whether the element signals the end of the stream. 
If
+        * true is returned the element won't be emitted.
+        *
+        * @param nextElement The element to test for the end-of-stream signal.
+        * @return True, if the element signals end of stream, false otherwise.
+        */
+       public boolean isEndOfStream(T nextElement);

Review comment:
       nit - We should be able to remove the `public` access modifiers on the 
interface as well

##########
File path: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
##########
@@ -208,29 +252,40 @@ public void close() throws Exception {
                }
        }
 
+       /**
+        * Parse and collects the body of the an AMQP message.
+        *
+        * <p>If any of the constructors with the {@link DeserializationSchema} 
class was used to construct the source
+        * it uses the {@link DeserializationSchema#deserialize(byte[])} to 
parse the body of the AMQP message.
+        *
+        * <p>If any of the constructors with the {@link 
RMQDeserializationSchema } class was used to construct the source it uses the
+        * {@link RMQDeserializationSchema#processMessage(Envelope, 
AMQP.BasicProperties, byte[], RMQDeserializationSchema.RMQCollector collector)}
+        * method of that provided instance.
+        *
+        * @param delivery the AMQP {@link QueueingConsumer.Delivery}
+        * @param collector a {@link RMQCollectorImpl} to collect the data
+        * @throws IOException
+        */
+       protected void processMessage(QueueingConsumer.Delivery delivery, 
RMQDeserializationSchema.RMQCollector collector) throws IOException {
+               AMQP.BasicProperties properties = delivery.getProperties();
+               byte[] body = delivery.getBody();
+
+               if (deliveryDeserializer != null){
+                       Envelope envelope = delivery.getEnvelope();
+                       deliveryDeserializer.processMessage(envelope, 
properties, body, collector);
+               } else {
+                       
collector.setMessageIdentifiers(properties.getCorrelationId(), 
delivery.getEnvelope().getDeliveryTag());
+                       collector.collect(schema.deserialize(body));
+               }
+       }

Review comment:
       If we wrap the schema and go with the `reset` method, this could be 
simplified to:
   
   ```java
   protected void processMessage(QueueingConsumer.Delivery delivery, 
RMQDeserializationSchema.RMQCollector<OUT> collector) throws IOException {
                AMQP.BasicProperties properties = delivery.getProperties();
                byte[] body = delivery.getBody();
                Envelope envelope = delivery.getEnvelope();
           // ensure the collector knows a new message is being processed
                collector.reset();
                schema.deserialize(envelope, properties, body, collector);
        }
   ```

##########
File path: 
flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
##########
@@ -382,6 +414,27 @@ public boolean isEndOfStream(String nextElement) {
                }
        }
 
+       private class CustomDeserializationSchema implements 
RMQDeserializationSchema<String> {
+               @Override
+               public TypeInformation<String> getProducedType() {
+                       return TypeExtractor.getForClass(String.class);
+               }
+
+               @Override
+               public void processMessage(Envelope envelope, 
AMQP.BasicProperties properties, byte[] body, RMQCollector collector) throws 
IOException {
+                       List<String> messages = new ArrayList();
+                       messages.add("I Love Turtles");
+                       messages.add("Brush your teeth");

Review comment:
        :paintbrush:  :grimacing:

##########
File path: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
##########
@@ -239,25 +294,75 @@ public void run(SourceContext<OUT> ctx) throws Exception {
                }
        }
 
-       private class RMQCollector implements Collector<OUT> {
-
+       /**
+        * Special collector for RMQ messages.
+        * Captures the correlation ID and delivery tag also does the filtering 
logic for weather a message has been
+        * processed or not.
+        */
+       private class RMQCollectorImpl implements 
RMQDeserializationSchema.RMQCollector<OUT> {
                private final SourceContext<OUT> ctx;
                private boolean endOfStreamSignalled = false;
+               private long deliveryTag;
+               private Boolean preCheckFlag;
 
-               private RMQCollector(SourceContext<OUT> ctx) {
+               private RMQCollectorImpl(SourceContext<OUT> ctx) {
                        this.ctx = ctx;
                }
 
                @Override
                public void collect(OUT record) {
-                       if (endOfStreamSignalled || 
schema.isEndOfStream(record)) {
-                               this.endOfStreamSignalled = true;
+                       Preconditions.checkNotNull(preCheckFlag, 
"setCorrelationID must be called at least once before" +
+                               "calling this method !");
+
+                       if (!preCheckFlag) {
                                return;
                        }
 
+                       if (isEndOfStream(record)) {
+                               this.endOfStreamSignalled = true;
+                               return;
+                       }
                        ctx.collect(record);
                }
 
+               public void collect(List<OUT> records) {
+                       Preconditions.checkNotNull(preCheckFlag, 
"setCorrelationID must be called at least once before" +
+                               "calling this method !");
+
+                       if (!preCheckFlag) {
+                               return;
+                       }
+
+                       for (OUT record : records){
+                               if (isEndOfStream(record)) {
+                                       this.endOfStreamSignalled = true;
+                                       return;
+                               }
+                               ctx.collect(record);
+                       }
+               }
+
+               public void setMessageIdentifiers(String correlationId, long 
deliveryTag){
+                       preCheckFlag = true;
+                       if (!autoAck) {
+                               if (usesCorrelationId) {
+                                       
Preconditions.checkNotNull(correlationId, "RabbitMQ source was instantiated " +
+                                               "with usesCorrelationId set to 
true yet we couldn't extract the correlation id from it !");
+                                       if (!addId(correlationId)) {
+                                               // we have already processed 
this message
+                                               preCheckFlag = false;
+                                       }
+                               }
+                               sessionIds.add(deliveryTag);
+                       }
+               }

Review comment:
       What do you think about keeping this as an internal method that just 
sets the current message's corrleationId and tag? And we could call it from the 
`collect(String correlationId, long deliveryTag,  OUT record)` method?
   
   ```java
   /**
                 * Set temporary state for the current message.
                 *
                 * @param correlationId the message's correlation ID, if used.
                 * @param deliveryTag the message's delivery tag.
                 */
                void setMessageIdentifiers(String correlationId, long 
deliveryTag) {
              // could make these checks nicer/ easier to read
                        Preconditions.checkArgument(!usesCorrelationId || 
correlationId == null || correlationId.equals(currentCorrelationId), "Cannot 
use different correlation IDs for the same message");
                        Preconditions.checkArgument(currentDeliveryTag == null 
|| currentDeliveryTag == deliveryTag, "Cannot use different delivery tags for 
the same message");
                        currentCorrelationId = correlationId;
                        currentDeliveryTag = deliveryTag;
                }
   ```
   
   
   And then if going with the individual record approach, we could do something 
like:
   
   ```java
   @Override
                public void collect(String correlationId, long deliveryTag, OUT 
record) {
                        setMessageIdentifiers(correlationId, deliveryTag);
                        if (!autoAck) {
                                if (usesCorrelationId) {
                      // could be moved into setMessageIdentifiers
                                        
Preconditions.checkNotNull(correlationId, "RabbitMQ source was instantiated " +
                                                "with usesCorrelationId set to 
true yet we couldn't extract the correlation id from it !");
                                        if (!addId(correlationId)) {
                                                // we have already processed 
this message
                                                return;
                                        }
                                }
                                sessionIds.add(deliveryTag);
                        }
                        if (isEndOfStream(record)) {
                                this.endOfStreamSignalled = true;
                                return;
                        }
   
                        ctx.collect(record);
                }
   
   ```
   

##########
File path: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQDeserializationSchema.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.util.Collector;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Envelope;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Interface for the set of methods required to parse an RMQ delivery.
+ * @param <T> The output type of the {@link RMQSource}
+ */
+public interface RMQDeserializationSchema<T> extends Serializable, 
ResultTypeQueryable<T> {

Review comment:
       We might also want to support the `open` method of the standard schema:
   
   ```java
   /**
         * Initialization method for the schema. It is called before the actual 
working methods
         * {@link #deserialize} and thus suitable for one time setup work.
         *
         * <p>The provided {@link DeserializationSchema.InitializationContext} 
can be used to access additional features such as e.g.
         * registering user metrics.
         *
         * @param context Contextual information that can be used during 
initialization.
         */
        @PublicEvolving
        default void open(DeserializationSchema.InitializationContext context) 
throws Exception {
        }
   
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to