dawidwys commented on a change in pull request #12056:
URL: https://github.com/apache/flink/pull/12056#discussion_r499658806



##########
File path: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQDeserializationSchema.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.util.Collector;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Envelope;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Interface for the set of methods required to parse an RMQ delivery.
+ * @param <T> The output type of the {@link RMQSource}
+ */
+public abstract class RMQDeserializationSchema<T> implements  Serializable, 
ResultTypeQueryable<T> {

Review comment:
       I did not mean to change this class to an abstract class. You can have a 
default implementation in an interface:
   
   ```
   interface RMQDeserializationSchema<T> extends Serializable, 
ResultTypeQueryable<T> {
   
        default void open(DeserializationSchema.InitializationContext context) 
throws Exception {
   
        }
   ```
   
   Moreover if you change it to abstract class, the comment about removing 
`public` modifiers from methods does not stand any longer. The `public` 
modifier is optional only for interfaces.

##########
File path: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
##########
@@ -268,54 +304,141 @@ public void run(SourceContext<OUT> ctx) throws Exception 
{
                }
        }
 
-       private class RMQCollector implements Collector<OUT> {
+       @Override
+       public void cancel() {
+               running = false;
+       }
+
+       @Override
+       protected void acknowledgeSessionIDs(List<Long> sessionIds) {
+               try {
+                       for (long id : sessionIds) {
+                               channel.basicAck(id, false);
+                       }
+                       channel.txCommit();
+               } catch (IOException e) {
+                       throw new RuntimeException("Messages could not be 
acknowledged during checkpoint creation.", e);
+               }
+       }
+
+       @Override
+       public TypeInformation<OUT> getProducedType() {
+               return deliveryDeserializer.getProducedType();
+       }
 
+       /**
+        * Special collector for RMQ messages.
+        * Captures the correlation ID and delivery tag also does the filtering 
logic for weather a message has been
+        * processed or not.
+        */
+       private class RMQCollectorImpl implements 
RMQDeserializationSchema.RMQCollector<OUT> {

Review comment:
       What do you think about his implementation of the collector:
   
   ```
        private void processMessage(Delivery delivery, RMQCollectorImpl 
collector)
                throws IOException {
                AMQP.BasicProperties properties = delivery.getProperties();
                byte[] body = delivery.getBody();
                Envelope envelope = delivery.getEnvelope();
                collector.setFallBackIdentifiers(properties.getCorrelationId(), 
envelope.getDeliveryTag());
                deliveryDeserializer.deserialize(envelope, properties, body, 
collector);
        }
   
   
        /**
         * Special collector for RMQ messages.
         * Captures the correlation ID and delivery tag also does the filtering 
logic for weather a message has been
         * processed or not.
         */
        private class RMQCollectorImpl implements 
RMQDeserializationSchema.RMQCollector<OUT> {
                private final SourceContext<OUT> ctx;
                private boolean endOfStreamSignalled = false;
                private String correlationId;
                private long deliveryTag;
                private boolean customIdentifiersSet = false;
   
                private RMQCollectorImpl(SourceContext<OUT> ctx) {
                        this.ctx = ctx;
                }
   
                @Override
                public void collect(OUT record) {
                        if (!customIdentifiersSet) {
                                boolean newMessage = 
setMessageIdentifiers(correlationId, deliveryTag);
                                if (!newMessage) {
                                        return;
                                }
                        }
   
                        if (isEndOfStream(record)) {
                                this.endOfStreamSignalled = true;
                                return;
                        }
                        ctx.collect(record);
                }
   
                public void setFallBackIdentifiers(String correlationId, long 
deliveryTag){
                        this.correlationId = correlationId;
                        this.deliveryTag = deliveryTag;
                        this.customIdentifiersSet = false;
                }
   
                @Override
                public boolean setMessageIdentifiers(String correlationId, long 
deliveryTag){
                        if (customIdentifiersSet) {
                                throw new IllegalStateException("You can set 
only a single set of identifiers for a block of messages.");
                        }
   
                        this.customIdentifiersSet = true;
                        if (!autoAck) {
                                if (usesCorrelationId) {
                                        
Preconditions.checkNotNull(correlationId, "RabbitMQ source was instantiated " +
                                                "with usesCorrelationId set to 
true yet we couldn't extract the correlation id from it !");
                                        if (!addId(correlationId)) {
                                                // we have already processed 
this message
                                                return false;
                                        }
                                }
                                sessionIds.add(deliveryTag);
                        }
                        return true;
                }
   
                boolean isEndOfStream(OUT record) {
                        return endOfStreamSignalled || 
deliveryDeserializer.isEndOfStream(record);
                }
   
                public boolean isEndOfStreamSignalled() {
                        return endOfStreamSignalled;
                }
   
                @Override
                public void close() {
   
                }
        }
   ```
   

##########
File path: 
flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
##########
@@ -318,6 +320,36 @@ public void testConstructorParams() throws Exception {
                assertEquals("passTest", testObj.getFactory().getPassword());
        }
 
+       /**
+        * Tests getting the correct body and correlationID given which 
constructor was called.
+        * if the constructor with the {@link DeserializationSchema} was called 
it should extract the body of the message
+        * from the {@link Delivery} and the correlation ID from the {@link 
AMQP.BasicProperties} which are
+        * mocked to "I Love Turtles" and "0".
+        * if the constructor with the {@link RMQDeserializationSchema} was 
called it uses the
+        * {@link RMQDeserializationSchema#deserialize} method to parse the 
message and extract the correlation ID which
+        * both are implemented in {@link RMQTestSource#initAMQPMocks()} to 
return the
+        * {@link AMQP.BasicProperties#getMessageId()} that is mocked to return 
"1-MESSAGE_ID"
+        */
+       @Test
+       public void testProcessMessage() throws Exception {

Review comment:
       Now without the warnings I took a closer look at the test and I thought 
maybe we could improve it a bit? I do prefer blackbox kind of tests. What do 
you think about a test like follows:
   
   ```
        @Test(timeout = 10000L)
        public void testCustomIdentifiers() throws Exception {
                source = new RMQTestSource(new CustomDeserializationSchema());
                sourceThread.start();
                source.initializeState(getMockContext());
                source.open(config);
                sourceThread.join();
   
                assertThat(DummySourceContext.numElementsCollected, 
equalTo(2L));
        }
   
        private static class CustomDeserializationSchema implements 
RMQDeserializationSchema<String> {
                @Override
                public TypeInformation<String> getProducedType() {
                        return TypeExtractor.getForClass(String.class);
                }
   
                @Override
                public void deserialize(Envelope envelope, AMQP.BasicProperties 
properties, byte[] body, RMQCollector<String> collector){
                        String correlationId = properties.getCorrelationId();
                        if (correlationId.equals("1")) {
                                collector.setMessageIdentifiers("1", 
envelope.getDeliveryTag());
                                collector.collect("I Love Turtles");
                                collector.collect("Brush your teeth");
                        } else if (correlationId.equals("2")) {
                                // should not be emitted, because it has the 
same correlationId as the previous one
                                collector.setMessageIdentifiers("1", 
envelope.getDeliveryTag());
                                collector.collect("Brush your teeth");
                        } else {
                                // should end the stream, should not be emitted
                                collector.setMessageIdentifiers("2", 
envelope.getDeliveryTag());
                                collector.collect("FINISH");
                        }
                }
   
                @Override
                public boolean isEndOfStream(String record) {
                        return record.equals("FINISH");
                }
        }
   ```

##########
File path: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQDeserializationSchema.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.util.Collector;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Envelope;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Interface for the set of methods required to parse an RMQ delivery.
+ * @param <T> The output type of the {@link RMQSource}
+ */
+public abstract class RMQDeserializationSchema<T> implements  Serializable, 
ResultTypeQueryable<T> {
+
+       /**
+        * Initialization method for the schema. It is called before the actual 
working methods
+        * {@link #deserialize} and thus suitable for one time setup work.
+        *
+        * <p>The provided {@link DeserializationSchema.InitializationContext} 
can be used to access additional features
+        * such as e.g.
+        * registering user metrics.
+        *
+        * @param context Contextual information that can be used during 
initialization.
+        */
+       void open(DeserializationSchema.InitializationContext context) throws 
Exception {
+
+       }
+
+       /**
+        * This method takes all the RabbitMQ delivery information supplied by 
the client extract the data and pass it to
+        * the collector.
+        *
+        * <p><b>NOTICE:</b> The implementation of this method can call {@link 
RMQCollector#setMessageIdentifiers} with
+        * a custom correlation ID and delivery tag if checkpointing and 
UseCorrelationID (in the RMQSource constructor)
+        * were enabled
+        * the {@link RMQSource}.
+        * @param envelope an AMQP {@link Envelope}.
+        * @param properties the {@link AMQP.BasicProperties} of the message.
+        * @param body the message itself as a byte array.
+        * @param collector the {@link RMQCollector} that will collect the data.
+        * @throws IOException When the body of the message can't be parsed
+        */
+       abstract void deserialize(Envelope envelope,
+                                                               
AMQP.BasicProperties properties,
+                                                               byte[] body,
+                                                               RMQCollector<T> 
collector) throws IOException;
+
+       /**
+        * Method to decide whether the element signals the end of the stream. 
If
+        * true is returned the element won't be emitted.
+        *
+        * @param nextElement The element to test for the end-of-stream signal.
+        * @return True, if the element signals end of stream, false otherwise.
+        */
+       abstract boolean isEndOfStream(T nextElement);
+
+       /**
+        * Special collector for RMQ messages.
+        *
+        * <p>It extends the {@link Collector} to give the ability to collect 
more than 1 message and the ability to set
+        * the message correlationId and deliveryTag.
+        */
+       public interface RMQCollector<T> extends Collector<T> {
+               void collect(List<T> records);

Review comment:
       Sorry I missed this method before. I don't think we should have it. It 
should be possible to collect element by element without creating a list up 
front. The current behaviour differs a lot from other schemas. Right now you 
can not emit multiple records via the `#collect` because they will be treated 
as duplicated records.

##########
File path: 
flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
##########
@@ -219,7 +221,7 @@ public void testCheckpointing() throws Exception {
 
                        assertEquals(numIds, messageIds.size());
                        if (messageIds.size() > 0) {
-                               
assertTrue(messageIds.contains(Long.toString(lastSnapshotId)));
+                               
assertTrue(messageIds.contains(Long.toString(lastSnapshotId - 1)));

Review comment:
       Thanks for the explanation. Unfortunately the testing setup is not too 
developer friendly to say the least.

##########
File path: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQDeserializationSchema.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.util.Collector;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Envelope;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Interface for the set of methods required to parse an RMQ delivery.
+ * @param <T> The output type of the {@link RMQSource}
+ */
+public abstract class RMQDeserializationSchema<T> implements  Serializable, 
ResultTypeQueryable<T> {
+
+       /**
+        * Initialization method for the schema. It is called before the actual 
working methods
+        * {@link #deserialize} and thus suitable for one time setup work.
+        *
+        * <p>The provided {@link DeserializationSchema.InitializationContext} 
can be used to access additional features
+        * such as e.g.
+        * registering user metrics.
+        *
+        * @param context Contextual information that can be used during 
initialization.
+        */
+       void open(DeserializationSchema.InitializationContext context) throws 
Exception {
+
+       }
+
+       /**
+        * This method takes all the RabbitMQ delivery information supplied by 
the client extract the data and pass it to
+        * the collector.
+        *
+        * <p><b>NOTICE:</b> The implementation of this method can call {@link 
RMQCollector#setMessageIdentifiers} with
+        * a custom correlation ID and delivery tag if checkpointing and 
UseCorrelationID (in the RMQSource constructor)
+        * were enabled
+        * the {@link RMQSource}.
+        * @param envelope an AMQP {@link Envelope}.
+        * @param properties the {@link AMQP.BasicProperties} of the message.
+        * @param body the message itself as a byte array.
+        * @param collector the {@link RMQCollector} that will collect the data.
+        * @throws IOException When the body of the message can't be parsed
+        */
+       abstract void deserialize(Envelope envelope,
+                                                               
AMQP.BasicProperties properties,
+                                                               byte[] body,
+                                                               RMQCollector<T> 
collector) throws IOException;
+
+       /**
+        * Method to decide whether the element signals the end of the stream. 
If
+        * true is returned the element won't be emitted.
+        *
+        * @param nextElement The element to test for the end-of-stream signal.
+        * @return True, if the element signals end of stream, false otherwise.
+        */
+       abstract boolean isEndOfStream(T nextElement);
+
+       /**
+        * Special collector for RMQ messages.
+        *
+        * <p>It extends the {@link Collector} to give the ability to collect 
more than 1 message and the ability to set
+        * the message correlationId and deliveryTag.
+        */
+       public interface RMQCollector<T> extends Collector<T> {
+               void collect(List<T> records);
+
+               void setMessageIdentifiers(String correlationId, long 
deliveryTag);

Review comment:
       Could you add a javadoc to this method? I think it is quite important to 
do so. Moreover even I personally would appreciate explanation what is the 
difference between `correlationId` and `deliveryTag` I am not too familiar with 
RMQ and personally don't know what is the difference.

##########
File path: 
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQDeserializationSchema.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.rabbitmq;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.util.Collector;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Envelope;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Interface for the set of methods required to parse an RMQ delivery.
+ * @param <T> The output type of the {@link RMQSource}
+ */
+public abstract class RMQDeserializationSchema<T> implements  Serializable, 
ResultTypeQueryable<T> {
+
+       /**
+        * Initialization method for the schema. It is called before the actual 
working methods
+        * {@link #deserialize} and thus suitable for one time setup work.
+        *
+        * <p>The provided {@link DeserializationSchema.InitializationContext} 
can be used to access additional features
+        * such as e.g.
+        * registering user metrics.
+        *
+        * @param context Contextual information that can be used during 
initialization.
+        */
+       void open(DeserializationSchema.InitializationContext context) throws 
Exception {
+
+       }
+
+       /**
+        * This method takes all the RabbitMQ delivery information supplied by 
the client extract the data and pass it to
+        * the collector.
+        *
+        * <p><b>NOTICE:</b> The implementation of this method can call {@link 
RMQCollector#setMessageIdentifiers} with
+        * a custom correlation ID and delivery tag if checkpointing and 
UseCorrelationID (in the RMQSource constructor)
+        * were enabled
+        * the {@link RMQSource}.
+        * @param envelope an AMQP {@link Envelope}.
+        * @param properties the {@link AMQP.BasicProperties} of the message.
+        * @param body the message itself as a byte array.
+        * @param collector the {@link RMQCollector} that will collect the data.
+        * @throws IOException When the body of the message can't be parsed
+        */
+       abstract void deserialize(Envelope envelope,
+                                                               
AMQP.BasicProperties properties,
+                                                               byte[] body,
+                                                               RMQCollector<T> 
collector) throws IOException;
+
+       /**
+        * Method to decide whether the element signals the end of the stream. 
If
+        * true is returned the element won't be emitted.
+        *
+        * @param nextElement The element to test for the end-of-stream signal.
+        * @return True, if the element signals end of stream, false otherwise.
+        */
+       abstract boolean isEndOfStream(T nextElement);
+
+       /**
+        * Special collector for RMQ messages.
+        *
+        * <p>It extends the {@link Collector} to give the ability to collect 
more than 1 message and the ability to set
+        * the message correlationId and deliveryTag.
+        */
+       public interface RMQCollector<T> extends Collector<T> {
+               void collect(List<T> records);
+
+               void setMessageIdentifiers(String correlationId, long 
deliveryTag);

Review comment:
       I also thought we could add `boolean` as the result type, to determine 
if we saw this message or not. That way we could potentially skip 
deserialization of the record in the schema, as we do in current master. (We do 
not even call `deserialize` if the message is a duplicate, but we probably can 
not achieve that with the Collector)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to