austince commented on a change in pull request #12056: URL: https://github.com/apache/flink/pull/12056#discussion_r440170857
########## File path: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQDeserializationSchema.java ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.rabbitmq; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.util.Collector; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Envelope; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; + +/** + * Interface for the set of methods required to parse an RMQ delivery. + * @param <T> The output type of the {@link RMQSource} + */ +public interface RMQDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> { + /** + * This method takes all the RabbitMQ delivery information supplied by the client extract the data and pass it to the + * collector. + * NOTICE: The implementation of this method MUST call {@link RMQCollector#setMessageIdentifiers(String, long)} with + * the correlation ID of the message if checkpointing and UseCorrelationID (in the RMQSource constructor) were enabled + * the {@link RMQSource}. + * @param envelope + * @param properties + * @param body + * @throws IOException + */ + public void processMessage(Envelope envelope, AMQP.BasicProperties properties, byte[] body, RMQCollector collector) throws IOException; Review comment: I think this might be nicer as `deserialize`, to fit with other Deserialization Schema patterns. ########## File path: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQDeserializationSchema.java ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.rabbitmq; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.util.Collector; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Envelope; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; + +/** + * Interface for the set of methods required to parse an RMQ delivery. + * @param <T> The output type of the {@link RMQSource} + */ +public interface RMQDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> { + /** + * This method takes all the RabbitMQ delivery information supplied by the client extract the data and pass it to the + * collector. + * NOTICE: The implementation of this method MUST call {@link RMQCollector#setMessageIdentifiers(String, long)} with + * the correlation ID of the message if checkpointing and UseCorrelationID (in the RMQSource constructor) were enabled + * the {@link RMQSource}. + * @param envelope + * @param properties + * @param body + * @throws IOException + */ + public void processMessage(Envelope envelope, AMQP.BasicProperties properties, byte[] body, RMQCollector collector) throws IOException; + + /** + * Method to decide whether the element signals the end of the stream. If + * true is returned the element won't be emitted. + * + * @param nextElement The element to test for the end-of-stream signal. + * @return True, if the element signals end of stream, false otherwise. + */ + public boolean isEndOfStream(T nextElement); + + /** + * The {@link TypeInformation} for the deserialized T. + * As an example the proper implementation of this method if T is a String is: + * {@code return TypeExtractor.getForClass(String.class)} + * @return TypeInformation + */ + public TypeInformation<T> getProducedType(); + + + /** + * Special collector for RMQ messages. + * Captures the correlation ID and delivery tag also does the filtering logic for weather a message has been + * processed or not. + * @param <T> + */ + public interface RMQCollector<T> extends Collector<T> { + public void collect(List<T> records); + + public void setMessageIdentifiers(String correlationId, long deliveryTag); + } Review comment: What do you think about this interface instead? ```java interface RMQCollector<T> extends Collector<T> { /** * Called before a new message is processed in order to clear any temporary state. */ default void reset() {} void collect(String correlationId, long deliveryTag, T record); void collect(T record); } ``` Where instead of having the user call `setMessageIdentifiers`, we can call this ourselves from the implementation. We can also ensure the same `correlationId` and `deliveryTag` are used for a message by storing them in local state, and clear them when the next message comes with the `reset` method. This would follow the emitter pattern of other collectors a bit more and still allow users to call `collect` multiple times per message. ########## File path: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java ########## @@ -77,6 +78,7 @@ private final RMQConnectionConfig rmqConnectionConfig; protected final String queueName; private final boolean usesCorrelationId; + protected RMQDeserializationSchema<OUT> deliveryDeserializer; protected DeserializationSchema<OUT> schema; Review comment: I think it would be nice if we always used the more-specific `RMQDeserializationSchema`, and wrapped the standard `DeserializationSchema` when passed in the constructor. This would allow us to remove the null checks/ ternaries. We could wrap it like this: ```java static <T> RMQDeserializationSchema<T> wrapDeserializationSchema(DeserializationSchema<T> schema) { return new RMQDeserializationSchema<>() { @Override public void deserialize(Envelope envelope, AMQP.BasicProperties properties, byte[] body, RMQCollector<T> collector) throws IOException { final long deliveryTag = envelope.getDeliveryTag(); final String correlationId = properties.getCorrelationId(); collector.collect(correlationId, deliveryTag, schema.deserialize(body)); } @Override public TypeInformation<T> getProducedType() { return schema.getProducedType(); } @Override public boolean isEndOfStream(T nextElement) { return schema.isEndOfStream(nextElement); } @Override public void open(DeserializationSchema.InitializationContext context) throws Exception { schema.open(context); } }; } ``` ########## File path: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQDeserializationSchema.java ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.rabbitmq; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.util.Collector; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Envelope; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; + +/** + * Interface for the set of methods required to parse an RMQ delivery. + * @param <T> The output type of the {@link RMQSource} + */ +public interface RMQDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> { + /** + * This method takes all the RabbitMQ delivery information supplied by the client extract the data and pass it to the + * collector. + * NOTICE: The implementation of this method MUST call {@link RMQCollector#setMessageIdentifiers(String, long)} with + * the correlation ID of the message if checkpointing and UseCorrelationID (in the RMQSource constructor) were enabled + * the {@link RMQSource}. + * @param envelope + * @param properties + * @param body + * @throws IOException + */ + public void processMessage(Envelope envelope, AMQP.BasicProperties properties, byte[] body, RMQCollector collector) throws IOException; + + /** + * Method to decide whether the element signals the end of the stream. If + * true is returned the element won't be emitted. + * + * @param nextElement The element to test for the end-of-stream signal. + * @return True, if the element signals end of stream, false otherwise. + */ + public boolean isEndOfStream(T nextElement); Review comment: nit - We should be able to remove the `public` access modifiers on the interface as well ########## File path: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java ########## @@ -208,29 +252,40 @@ public void close() throws Exception { } } + /** + * Parse and collects the body of the an AMQP message. + * + * <p>If any of the constructors with the {@link DeserializationSchema} class was used to construct the source + * it uses the {@link DeserializationSchema#deserialize(byte[])} to parse the body of the AMQP message. + * + * <p>If any of the constructors with the {@link RMQDeserializationSchema } class was used to construct the source it uses the + * {@link RMQDeserializationSchema#processMessage(Envelope, AMQP.BasicProperties, byte[], RMQDeserializationSchema.RMQCollector collector)} + * method of that provided instance. + * + * @param delivery the AMQP {@link QueueingConsumer.Delivery} + * @param collector a {@link RMQCollectorImpl} to collect the data + * @throws IOException + */ + protected void processMessage(QueueingConsumer.Delivery delivery, RMQDeserializationSchema.RMQCollector collector) throws IOException { + AMQP.BasicProperties properties = delivery.getProperties(); + byte[] body = delivery.getBody(); + + if (deliveryDeserializer != null){ + Envelope envelope = delivery.getEnvelope(); + deliveryDeserializer.processMessage(envelope, properties, body, collector); + } else { + collector.setMessageIdentifiers(properties.getCorrelationId(), delivery.getEnvelope().getDeliveryTag()); + collector.collect(schema.deserialize(body)); + } + } Review comment: If we wrap the schema and go with the `reset` method, this could be simplified to: ```java protected void processMessage(QueueingConsumer.Delivery delivery, RMQDeserializationSchema.RMQCollector<OUT> collector) throws IOException { AMQP.BasicProperties properties = delivery.getProperties(); byte[] body = delivery.getBody(); Envelope envelope = delivery.getEnvelope(); // ensure the collector knows a new message is being processed collector.reset(); schema.deserialize(envelope, properties, body, collector); } ``` ########## File path: flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java ########## @@ -382,6 +414,27 @@ public boolean isEndOfStream(String nextElement) { } } + private class CustomDeserializationSchema implements RMQDeserializationSchema<String> { + @Override + public TypeInformation<String> getProducedType() { + return TypeExtractor.getForClass(String.class); + } + + @Override + public void processMessage(Envelope envelope, AMQP.BasicProperties properties, byte[] body, RMQCollector collector) throws IOException { + List<String> messages = new ArrayList(); + messages.add("I Love Turtles"); + messages.add("Brush your teeth"); Review comment: :paintbrush: :grimacing: ########## File path: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java ########## @@ -239,25 +294,75 @@ public void run(SourceContext<OUT> ctx) throws Exception { } } - private class RMQCollector implements Collector<OUT> { - + /** + * Special collector for RMQ messages. + * Captures the correlation ID and delivery tag also does the filtering logic for weather a message has been + * processed or not. + */ + private class RMQCollectorImpl implements RMQDeserializationSchema.RMQCollector<OUT> { private final SourceContext<OUT> ctx; private boolean endOfStreamSignalled = false; + private long deliveryTag; + private Boolean preCheckFlag; - private RMQCollector(SourceContext<OUT> ctx) { + private RMQCollectorImpl(SourceContext<OUT> ctx) { this.ctx = ctx; } @Override public void collect(OUT record) { - if (endOfStreamSignalled || schema.isEndOfStream(record)) { - this.endOfStreamSignalled = true; + Preconditions.checkNotNull(preCheckFlag, "setCorrelationID must be called at least once before" + + "calling this method !"); + + if (!preCheckFlag) { return; } + if (isEndOfStream(record)) { + this.endOfStreamSignalled = true; + return; + } ctx.collect(record); } + public void collect(List<OUT> records) { + Preconditions.checkNotNull(preCheckFlag, "setCorrelationID must be called at least once before" + + "calling this method !"); + + if (!preCheckFlag) { + return; + } + + for (OUT record : records){ + if (isEndOfStream(record)) { + this.endOfStreamSignalled = true; + return; + } + ctx.collect(record); + } + } + + public void setMessageIdentifiers(String correlationId, long deliveryTag){ + preCheckFlag = true; + if (!autoAck) { + if (usesCorrelationId) { + Preconditions.checkNotNull(correlationId, "RabbitMQ source was instantiated " + + "with usesCorrelationId set to true yet we couldn't extract the correlation id from it !"); + if (!addId(correlationId)) { + // we have already processed this message + preCheckFlag = false; + } + } + sessionIds.add(deliveryTag); + } + } Review comment: What do you think about keeping this as an internal method that just sets the current message's corrleationId and tag? And we could call it from the `collect(String correlationId, long deliveryTag, OUT record)` method? ```java /** * Set temporary state for the current message. * * @param correlationId the message's correlation ID, if used. * @param deliveryTag the message's delivery tag. */ void setMessageIdentifiers(String correlationId, long deliveryTag) { // could make these checks nicer/ easier to read Preconditions.checkArgument(!usesCorrelationId || correlationId == null || correlationId.equals(currentCorrelationId), "Cannot use different correlation IDs for the same message"); Preconditions.checkArgument(currentDeliveryTag == null || currentDeliveryTag == deliveryTag, "Cannot use different delivery tags for the same message"); currentCorrelationId = correlationId; currentDeliveryTag = deliveryTag; } ``` And then if going with the individual record approach, we could do something like: ```java @Override public void collect(String correlationId, long deliveryTag, OUT record) { setMessageIdentifiers(correlationId, deliveryTag); if (!autoAck) { if (usesCorrelationId) { // could be moved into setMessageIdentifiers Preconditions.checkNotNull(correlationId, "RabbitMQ source was instantiated " + "with usesCorrelationId set to true yet we couldn't extract the correlation id from it !"); if (!addId(correlationId)) { // we have already processed this message return; } } sessionIds.add(deliveryTag); } if (isEndOfStream(record)) { this.endOfStreamSignalled = true; return; } ctx.collect(record); } ``` ########## File path: flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQDeserializationSchema.java ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.rabbitmq; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.util.Collector; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Envelope; + +import java.io.IOException; +import java.io.Serializable; +import java.util.List; + +/** + * Interface for the set of methods required to parse an RMQ delivery. + * @param <T> The output type of the {@link RMQSource} + */ +public interface RMQDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> { Review comment: We might also want to support the `open` method of the standard schema: ```java /** * Initialization method for the schema. It is called before the actual working methods * {@link #deserialize} and thus suitable for one time setup work. * * <p>The provided {@link DeserializationSchema.InitializationContext} can be used to access additional features such as e.g. * registering user metrics. * * @param context Contextual information that can be used during initialization. */ @PublicEvolving default void open(DeserializationSchema.InitializationContext context) throws Exception { } ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
