dawidwys commented on a change in pull request #12056: URL: https://github.com/apache/flink/pull/12056#discussion_r499658806
########## File path: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQDeserializationSchema.java ########## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.rabbitmq; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.util.Collector; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Envelope; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; + +/** + * Interface for the set of methods required to parse an RMQ delivery. + * @param <T> The output type of the {@link RMQSource} + */ +public abstract class RMQDeserializationSchema<T> implements Serializable, ResultTypeQueryable<T> { Review comment: I did not mean to change this class to an abstract class. You can have a default implementation in an interface: ``` interface RMQDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> { default void open(DeserializationSchema.InitializationContext context) throws Exception { } ``` Moreover if you change it to abstract class, the comment about removing `public` modifiers from methods does not stand any longer. The `public` modifier is optional only for interfaces. ########## File path: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java ########## @@ -268,54 +304,141 @@ public void run(SourceContext<OUT> ctx) throws Exception { } } - private class RMQCollector implements Collector<OUT> { + @Override + public void cancel() { + running = false; + } + + @Override + protected void acknowledgeSessionIDs(List<Long> sessionIds) { + try { + for (long id : sessionIds) { + channel.basicAck(id, false); + } + channel.txCommit(); + } catch (IOException e) { + throw new RuntimeException("Messages could not be acknowledged during checkpoint creation.", e); + } + } + + @Override + public TypeInformation<OUT> getProducedType() { + return deliveryDeserializer.getProducedType(); + } + /** + * Special collector for RMQ messages. + * Captures the correlation ID and delivery tag also does the filtering logic for weather a message has been + * processed or not. + */ + private class RMQCollectorImpl implements RMQDeserializationSchema.RMQCollector<OUT> { Review comment: What do you think about his implementation of the collector: ``` private void processMessage(Delivery delivery, RMQCollectorImpl collector) throws IOException { AMQP.BasicProperties properties = delivery.getProperties(); byte[] body = delivery.getBody(); Envelope envelope = delivery.getEnvelope(); collector.setFallBackIdentifiers(properties.getCorrelationId(), envelope.getDeliveryTag()); deliveryDeserializer.deserialize(envelope, properties, body, collector); } /** * Special collector for RMQ messages. * Captures the correlation ID and delivery tag also does the filtering logic for weather a message has been * processed or not. */ private class RMQCollectorImpl implements RMQDeserializationSchema.RMQCollector<OUT> { private final SourceContext<OUT> ctx; private boolean endOfStreamSignalled = false; private String correlationId; private long deliveryTag; private boolean customIdentifiersSet = false; private RMQCollectorImpl(SourceContext<OUT> ctx) { this.ctx = ctx; } @Override public void collect(OUT record) { if (!customIdentifiersSet) { boolean newMessage = setMessageIdentifiers(correlationId, deliveryTag); if (!newMessage) { return; } } if (isEndOfStream(record)) { this.endOfStreamSignalled = true; return; } ctx.collect(record); } public void setFallBackIdentifiers(String correlationId, long deliveryTag){ this.correlationId = correlationId; this.deliveryTag = deliveryTag; this.customIdentifiersSet = false; } @Override public boolean setMessageIdentifiers(String correlationId, long deliveryTag){ if (customIdentifiersSet) { throw new IllegalStateException("You can set only a single set of identifiers for a block of messages."); } this.customIdentifiersSet = true; if (!autoAck) { if (usesCorrelationId) { Preconditions.checkNotNull(correlationId, "RabbitMQ source was instantiated " + "with usesCorrelationId set to true yet we couldn't extract the correlation id from it !"); if (!addId(correlationId)) { // we have already processed this message return false; } } sessionIds.add(deliveryTag); } return true; } boolean isEndOfStream(OUT record) { return endOfStreamSignalled || deliveryDeserializer.isEndOfStream(record); } public boolean isEndOfStreamSignalled() { return endOfStreamSignalled; } @Override public void close() { } } ``` ########## File path: flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java ########## @@ -318,6 +320,36 @@ public void testConstructorParams() throws Exception { assertEquals("passTest", testObj.getFactory().getPassword()); } + /** + * Tests getting the correct body and correlationID given which constructor was called. + * if the constructor with the {@link DeserializationSchema} was called it should extract the body of the message + * from the {@link Delivery} and the correlation ID from the {@link AMQP.BasicProperties} which are + * mocked to "I Love Turtles" and "0". + * if the constructor with the {@link RMQDeserializationSchema} was called it uses the + * {@link RMQDeserializationSchema#deserialize} method to parse the message and extract the correlation ID which + * both are implemented in {@link RMQTestSource#initAMQPMocks()} to return the + * {@link AMQP.BasicProperties#getMessageId()} that is mocked to return "1-MESSAGE_ID" + */ + @Test + public void testProcessMessage() throws Exception { Review comment: Now without the warnings I took a closer look at the test and I thought maybe we could improve it a bit? I do prefer blackbox kind of tests. What do you think about a test like follows: ``` @Test(timeout = 10000L) public void testCustomIdentifiers() throws Exception { source = new RMQTestSource(new CustomDeserializationSchema()); sourceThread.start(); source.initializeState(getMockContext()); source.open(config); sourceThread.join(); assertThat(DummySourceContext.numElementsCollected, equalTo(2L)); } private static class CustomDeserializationSchema implements RMQDeserializationSchema<String> { @Override public TypeInformation<String> getProducedType() { return TypeExtractor.getForClass(String.class); } @Override public void deserialize(Envelope envelope, AMQP.BasicProperties properties, byte[] body, RMQCollector<String> collector){ String correlationId = properties.getCorrelationId(); if (correlationId.equals("1")) { collector.setMessageIdentifiers("1", envelope.getDeliveryTag()); collector.collect("I Love Turtles"); collector.collect("Brush your teeth"); } else if (correlationId.equals("2")) { // should not be emitted, because it has the same correlationId as the previous one collector.setMessageIdentifiers("1", envelope.getDeliveryTag()); collector.collect("Brush your teeth"); } else { // should end the stream, should not be emitted collector.setMessageIdentifiers("2", envelope.getDeliveryTag()); collector.collect("FINISH"); } } @Override public boolean isEndOfStream(String record) { return record.equals("FINISH"); } } ``` ########## File path: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQDeserializationSchema.java ########## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.rabbitmq; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.util.Collector; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Envelope; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; + +/** + * Interface for the set of methods required to parse an RMQ delivery. + * @param <T> The output type of the {@link RMQSource} + */ +public abstract class RMQDeserializationSchema<T> implements Serializable, ResultTypeQueryable<T> { + + /** + * Initialization method for the schema. It is called before the actual working methods + * {@link #deserialize} and thus suitable for one time setup work. + * + * <p>The provided {@link DeserializationSchema.InitializationContext} can be used to access additional features + * such as e.g. + * registering user metrics. + * + * @param context Contextual information that can be used during initialization. + */ + void open(DeserializationSchema.InitializationContext context) throws Exception { + + } + + /** + * This method takes all the RabbitMQ delivery information supplied by the client extract the data and pass it to + * the collector. + * + * <p><b>NOTICE:</b> The implementation of this method can call {@link RMQCollector#setMessageIdentifiers} with + * a custom correlation ID and delivery tag if checkpointing and UseCorrelationID (in the RMQSource constructor) + * were enabled + * the {@link RMQSource}. + * @param envelope an AMQP {@link Envelope}. + * @param properties the {@link AMQP.BasicProperties} of the message. + * @param body the message itself as a byte array. + * @param collector the {@link RMQCollector} that will collect the data. + * @throws IOException When the body of the message can't be parsed + */ + abstract void deserialize(Envelope envelope, + AMQP.BasicProperties properties, + byte[] body, + RMQCollector<T> collector) throws IOException; + + /** + * Method to decide whether the element signals the end of the stream. If + * true is returned the element won't be emitted. + * + * @param nextElement The element to test for the end-of-stream signal. + * @return True, if the element signals end of stream, false otherwise. + */ + abstract boolean isEndOfStream(T nextElement); + + /** + * Special collector for RMQ messages. + * + * <p>It extends the {@link Collector} to give the ability to collect more than 1 message and the ability to set + * the message correlationId and deliveryTag. + */ + public interface RMQCollector<T> extends Collector<T> { + void collect(List<T> records); Review comment: Sorry I missed this method before. I don't think we should have it. It should be possible to collect element by element without creating a list up front. The current behaviour differs a lot from other schemas. Right now you can not emit multiple records via the `#collect` because they will be treated as duplicated records. ########## File path: flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java ########## @@ -219,7 +221,7 @@ public void testCheckpointing() throws Exception { assertEquals(numIds, messageIds.size()); if (messageIds.size() > 0) { - assertTrue(messageIds.contains(Long.toString(lastSnapshotId))); + assertTrue(messageIds.contains(Long.toString(lastSnapshotId - 1))); Review comment: Thanks for the explanation. Unfortunately the testing setup is not too developer friendly to say the least. ########## File path: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQDeserializationSchema.java ########## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.rabbitmq; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.util.Collector; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Envelope; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; + +/** + * Interface for the set of methods required to parse an RMQ delivery. + * @param <T> The output type of the {@link RMQSource} + */ +public abstract class RMQDeserializationSchema<T> implements Serializable, ResultTypeQueryable<T> { + + /** + * Initialization method for the schema. It is called before the actual working methods + * {@link #deserialize} and thus suitable for one time setup work. + * + * <p>The provided {@link DeserializationSchema.InitializationContext} can be used to access additional features + * such as e.g. + * registering user metrics. + * + * @param context Contextual information that can be used during initialization. + */ + void open(DeserializationSchema.InitializationContext context) throws Exception { + + } + + /** + * This method takes all the RabbitMQ delivery information supplied by the client extract the data and pass it to + * the collector. + * + * <p><b>NOTICE:</b> The implementation of this method can call {@link RMQCollector#setMessageIdentifiers} with + * a custom correlation ID and delivery tag if checkpointing and UseCorrelationID (in the RMQSource constructor) + * were enabled + * the {@link RMQSource}. + * @param envelope an AMQP {@link Envelope}. + * @param properties the {@link AMQP.BasicProperties} of the message. + * @param body the message itself as a byte array. + * @param collector the {@link RMQCollector} that will collect the data. + * @throws IOException When the body of the message can't be parsed + */ + abstract void deserialize(Envelope envelope, + AMQP.BasicProperties properties, + byte[] body, + RMQCollector<T> collector) throws IOException; + + /** + * Method to decide whether the element signals the end of the stream. If + * true is returned the element won't be emitted. + * + * @param nextElement The element to test for the end-of-stream signal. + * @return True, if the element signals end of stream, false otherwise. + */ + abstract boolean isEndOfStream(T nextElement); + + /** + * Special collector for RMQ messages. + * + * <p>It extends the {@link Collector} to give the ability to collect more than 1 message and the ability to set + * the message correlationId and deliveryTag. + */ + public interface RMQCollector<T> extends Collector<T> { + void collect(List<T> records); + + void setMessageIdentifiers(String correlationId, long deliveryTag); Review comment: Could you add a javadoc to this method? I think it is quite important to do so. Moreover even I personally would appreciate explanation what is the difference between `correlationId` and `deliveryTag` I am not too familiar with RMQ and personally don't know what is the difference. ########## File path: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQDeserializationSchema.java ########## @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.rabbitmq; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.util.Collector; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Envelope; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; + +/** + * Interface for the set of methods required to parse an RMQ delivery. + * @param <T> The output type of the {@link RMQSource} + */ +public abstract class RMQDeserializationSchema<T> implements Serializable, ResultTypeQueryable<T> { + + /** + * Initialization method for the schema. It is called before the actual working methods + * {@link #deserialize} and thus suitable for one time setup work. + * + * <p>The provided {@link DeserializationSchema.InitializationContext} can be used to access additional features + * such as e.g. + * registering user metrics. + * + * @param context Contextual information that can be used during initialization. + */ + void open(DeserializationSchema.InitializationContext context) throws Exception { + + } + + /** + * This method takes all the RabbitMQ delivery information supplied by the client extract the data and pass it to + * the collector. + * + * <p><b>NOTICE:</b> The implementation of this method can call {@link RMQCollector#setMessageIdentifiers} with + * a custom correlation ID and delivery tag if checkpointing and UseCorrelationID (in the RMQSource constructor) + * were enabled + * the {@link RMQSource}. + * @param envelope an AMQP {@link Envelope}. + * @param properties the {@link AMQP.BasicProperties} of the message. + * @param body the message itself as a byte array. + * @param collector the {@link RMQCollector} that will collect the data. + * @throws IOException When the body of the message can't be parsed + */ + abstract void deserialize(Envelope envelope, + AMQP.BasicProperties properties, + byte[] body, + RMQCollector<T> collector) throws IOException; + + /** + * Method to decide whether the element signals the end of the stream. If + * true is returned the element won't be emitted. + * + * @param nextElement The element to test for the end-of-stream signal. + * @return True, if the element signals end of stream, false otherwise. + */ + abstract boolean isEndOfStream(T nextElement); + + /** + * Special collector for RMQ messages. + * + * <p>It extends the {@link Collector} to give the ability to collect more than 1 message and the ability to set + * the message correlationId and deliveryTag. + */ + public interface RMQCollector<T> extends Collector<T> { + void collect(List<T> records); + + void setMessageIdentifiers(String correlationId, long deliveryTag); Review comment: I also thought we could add `boolean` as the result type, to determine if we saw this message or not. That way we could potentially skip deserialization of the record in the schema, as we do in current master. (We do not even call `deserialize` if the message is a duplicate, but we probably can not achieve that with the Collector) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
