RocMarshal commented on a change in pull request #15140: URL: https://github.com/apache/flink/pull/15140#discussion_r774276762
########## File path: flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/source/enumerator/RabbitMQSourceEnumerator.java ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq2.source.enumerator; + +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.connector.rabbitmq2.common.ConsistencyMode; +import org.apache.flink.connector.rabbitmq2.common.RabbitMQConnectionConfig; +import org.apache.flink.connector.rabbitmq2.source.split.RabbitMQSourceSplit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.List; + +import static java.util.Objects.requireNonNull; + +/** + * The source enumerator provides the source readers with the split. All source readers receive the + * same split as it only contains information about the connection and in case of exactly-once, the + * seen correlation ids. But in this case, the enumerator makes sure that at maximum one source + * reader receives the split. During exactly-once if multiple reader should be assigned a split a + * {@link RuntimeException} is thrown. + */ +public class RabbitMQSourceEnumerator + implements SplitEnumerator<RabbitMQSourceSplit, RabbitMQSourceEnumState> { + private final SplitEnumeratorContext<RabbitMQSourceSplit> context; + private final ConsistencyMode consistencyMode; + private static final Logger LOG = LoggerFactory.getLogger(RabbitMQSourceEnumerator.class); + private RabbitMQSourceSplit split; + + public RabbitMQSourceEnumerator( + SplitEnumeratorContext<RabbitMQSourceSplit> context, + ConsistencyMode consistencyMode, + RabbitMQConnectionConfig connectionConfig, + String rmqQueueName, + RabbitMQSourceEnumState enumState) { + // The enumState is not used since the enumerator has no state in this architecture. + this(context, consistencyMode, connectionConfig, rmqQueueName); + } + + public RabbitMQSourceEnumerator( + SplitEnumeratorContext<RabbitMQSourceSplit> context, + ConsistencyMode consistencyMode, + RabbitMQConnectionConfig connectionConfig, + String rmqQueueName) { + this.context = requireNonNull(context); + this.consistencyMode = requireNonNull(consistencyMode); + this.split = new RabbitMQSourceSplit(connectionConfig, rmqQueueName); + } + + @Override + public void start() { + LOG.info("Start RabbitMQ source enumerator"); + } + + @Override + public void handleSplitRequest(int i, @Nullable String s) { + LOG.info("Split request from reader " + i); + assignSplitToReader(i, split); + } + + @Override + public void addSplitsBack(List<RabbitMQSourceSplit> list, int i) { + if (list.size() == 0) { Review comment: ```suggestion if (list == null || list.size() == 0) { ``` ########## File path: flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/RabbitMQSink.java ########## @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq2.sink; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.connector.sink.Committer; +import org.apache.flink.api.connector.sink.GlobalCommitter; +import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.connector.rabbitmq2.common.ConsistencyMode; +import org.apache.flink.connector.rabbitmq2.common.RabbitMQConnectionConfig; +import org.apache.flink.connector.rabbitmq2.sink.common.RabbitMQSinkPublishOptions; +import org.apache.flink.connector.rabbitmq2.sink.common.SerializableReturnListener; +import org.apache.flink.connector.rabbitmq2.sink.state.RabbitMQSinkWriterState; +import org.apache.flink.connector.rabbitmq2.sink.state.RabbitMQSinkWriterStateSerializer; +import org.apache.flink.connector.rabbitmq2.sink.writer.RabbitMQSinkWriterBase; +import org.apache.flink.connector.rabbitmq2.sink.writer.specialized.RabbitMQSinkWriterAtLeastOnce; +import org.apache.flink.connector.rabbitmq2.sink.writer.specialized.RabbitMQSinkWriterAtMostOnce; +import org.apache.flink.connector.rabbitmq2.sink.writer.specialized.RabbitMQSinkWriterExactlyOnce; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Optional; + +import static java.util.Objects.requireNonNull; + +/** + * RabbitMQ sink that publishes messages to a RabbitMQ queue. It provides at-most-once, + * at-least-once or exactly-once processing semantics. For at-least-once and exactly-once, + * checkpointing needs to be enabled. + * + * <pre>{@code + * RabbitMQSink + * .builder() + * .setConnectionConfig(connectionConfig) + * .setQueueName("queue") + * .setSerializationSchema(new SimpleStringSchema()) + * .setConsistencyMode(ConsistencyMode.AT_LEAST_ONCE) + * .build(); + * }</pre> + * + * <p>When creating the sink a {@code connectionConfig} must be specified via {@link + * RabbitMQConnectionConfig}. It contains required information for the RabbitMQ java client to + * connect to the RabbitMQ server. A minimum configuration contains a (virtual) host, a username, a + * password and a port. Besides that, the {@code queueName} to publish to and a {@link + * SerializationSchema} for the sink input type is required. {@code publishOptions} can be added + * optionally to route messages in RabbitMQ. + * + * <p>If at-least-once is required messages are buffered until an acknowledgement arrives because + * delivery needs to be guaranteed. On each checkpoint, all unacknowledged messages will be resent + * to RabbitMQ. In case of a failure, all unacknowledged messages can be restored and resend. + * + * <p>In the case of exactly-once a transactional RabbitMQ channel is used to achieve that all + * messages within a checkpoint are delivered once and only once. All messages that arrive in a + * checkpoint interval are buffered and sent to RabbitMQ in a single transaction when the checkpoint + * is triggered. If the transaction fails, all messages that were a part of the transaction are put + * back into the buffer and a resend is issued in the next checkpoint. + * + * <p>Keep in mind that the transactional channels are heavyweight and the performance will drop. + * Under heavy load, checkpoints can be delayed if a transaction takes longer than the specified + * checkpointing interval. + * + * <p>If publish options are used and the checkpointing mode is at-least-once or exactly-once, they + * require a {@link DeserializationSchema} to be provided because messages that were persisted as + * part of an earlier checkpoint are needed to recompute routing/exchange. + */ +public class RabbitMQSink<T> implements Sink<T, Void, RabbitMQSinkWriterState<T>, Void> { + + private final RabbitMQConnectionConfig connectionConfig; + private final String queueName; + private final SerializationSchema<T> serializationSchema; + private final RabbitMQSinkPublishOptions<T> publishOptions; + private final ConsistencyMode consistencyMode; + private final SerializableReturnListener returnListener; + + private static final ConsistencyMode DEFAULT_CONSISTENCY_MODE = ConsistencyMode.AT_MOST_ONCE; + + private RabbitMQSink( + RabbitMQConnectionConfig connectionConfig, + String queueName, + SerializationSchema<T> serializationSchema, + ConsistencyMode consistencyMode, + @Nullable SerializableReturnListener returnListener, + @Nullable RabbitMQSinkPublishOptions<T> publishOptions) { + this.connectionConfig = requireNonNull(connectionConfig); + this.queueName = requireNonNull(queueName); + this.serializationSchema = requireNonNull(serializationSchema); + this.consistencyMode = requireNonNull(consistencyMode); + + this.returnListener = returnListener; + + Preconditions.checkState( + verifyPublishOptions(), + "If consistency mode is stronger than at-most-once and publish options are defined" + + "then publish options need a deserialization schema"); + this.publishOptions = publishOptions; + } + + private boolean verifyPublishOptions() { + // If at-most-once, doesnt matter if publish options are provided (no state in writer) Review comment: ```suggestion // If at-most-once, doesn't matter if publish options are provided (no state in writer) ``` ########## File path: flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/source/enumerator/RabbitMQSourceEnumerator.java ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq2.source.enumerator; + +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.connector.rabbitmq2.common.ConsistencyMode; +import org.apache.flink.connector.rabbitmq2.common.RabbitMQConnectionConfig; +import org.apache.flink.connector.rabbitmq2.source.split.RabbitMQSourceSplit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.List; + +import static java.util.Objects.requireNonNull; + +/** + * The source enumerator provides the source readers with the split. All source readers receive the + * same split as it only contains information about the connection and in case of exactly-once, the + * seen correlation ids. But in this case, the enumerator makes sure that at maximum one source + * reader receives the split. During exactly-once if multiple reader should be assigned a split a + * {@link RuntimeException} is thrown. + */ +public class RabbitMQSourceEnumerator + implements SplitEnumerator<RabbitMQSourceSplit, RabbitMQSourceEnumState> { + private final SplitEnumeratorContext<RabbitMQSourceSplit> context; + private final ConsistencyMode consistencyMode; + private static final Logger LOG = LoggerFactory.getLogger(RabbitMQSourceEnumerator.class); + private RabbitMQSourceSplit split; + + public RabbitMQSourceEnumerator( + SplitEnumeratorContext<RabbitMQSourceSplit> context, + ConsistencyMode consistencyMode, + RabbitMQConnectionConfig connectionConfig, + String rmqQueueName, + RabbitMQSourceEnumState enumState) { + // The enumState is not used since the enumerator has no state in this architecture. + this(context, consistencyMode, connectionConfig, rmqQueueName); + } + + public RabbitMQSourceEnumerator( + SplitEnumeratorContext<RabbitMQSourceSplit> context, + ConsistencyMode consistencyMode, + RabbitMQConnectionConfig connectionConfig, + String rmqQueueName) { + this.context = requireNonNull(context); + this.consistencyMode = requireNonNull(consistencyMode); + this.split = new RabbitMQSourceSplit(connectionConfig, rmqQueueName); + } + + @Override + public void start() { + LOG.info("Start RabbitMQ source enumerator"); + } + + @Override + public void handleSplitRequest(int i, @Nullable String s) { + LOG.info("Split request from reader " + i); Review comment: ```suggestion LOG.info("Split request from reader {}.", i); ``` ########## File path: flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/sink/RabbitMQSink.java ########## @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq2.sink; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.connector.sink.Committer; +import org.apache.flink.api.connector.sink.GlobalCommitter; +import org.apache.flink.api.connector.sink.Sink; +import org.apache.flink.api.connector.sink.SinkWriter; +import org.apache.flink.connector.rabbitmq2.common.ConsistencyMode; +import org.apache.flink.connector.rabbitmq2.common.RabbitMQConnectionConfig; +import org.apache.flink.connector.rabbitmq2.sink.common.RabbitMQSinkPublishOptions; +import org.apache.flink.connector.rabbitmq2.sink.common.SerializableReturnListener; +import org.apache.flink.connector.rabbitmq2.sink.state.RabbitMQSinkWriterState; +import org.apache.flink.connector.rabbitmq2.sink.state.RabbitMQSinkWriterStateSerializer; +import org.apache.flink.connector.rabbitmq2.sink.writer.RabbitMQSinkWriterBase; +import org.apache.flink.connector.rabbitmq2.sink.writer.specialized.RabbitMQSinkWriterAtLeastOnce; +import org.apache.flink.connector.rabbitmq2.sink.writer.specialized.RabbitMQSinkWriterAtMostOnce; +import org.apache.flink.connector.rabbitmq2.sink.writer.specialized.RabbitMQSinkWriterExactlyOnce; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Optional; + +import static java.util.Objects.requireNonNull; + +/** + * RabbitMQ sink that publishes messages to a RabbitMQ queue. It provides at-most-once, + * at-least-once or exactly-once processing semantics. For at-least-once and exactly-once, + * checkpointing needs to be enabled. + * + * <pre>{@code + * RabbitMQSink + * .builder() + * .setConnectionConfig(connectionConfig) + * .setQueueName("queue") + * .setSerializationSchema(new SimpleStringSchema()) + * .setConsistencyMode(ConsistencyMode.AT_LEAST_ONCE) + * .build(); + * }</pre> + * + * <p>When creating the sink a {@code connectionConfig} must be specified via {@link + * RabbitMQConnectionConfig}. It contains required information for the RabbitMQ java client to + * connect to the RabbitMQ server. A minimum configuration contains a (virtual) host, a username, a + * password and a port. Besides that, the {@code queueName} to publish to and a {@link + * SerializationSchema} for the sink input type is required. {@code publishOptions} can be added + * optionally to route messages in RabbitMQ. + * + * <p>If at-least-once is required messages are buffered until an acknowledgement arrives because + * delivery needs to be guaranteed. On each checkpoint, all unacknowledged messages will be resent + * to RabbitMQ. In case of a failure, all unacknowledged messages can be restored and resend. + * + * <p>In the case of exactly-once a transactional RabbitMQ channel is used to achieve that all + * messages within a checkpoint are delivered once and only once. All messages that arrive in a + * checkpoint interval are buffered and sent to RabbitMQ in a single transaction when the checkpoint + * is triggered. If the transaction fails, all messages that were a part of the transaction are put + * back into the buffer and a resend is issued in the next checkpoint. + * + * <p>Keep in mind that the transactional channels are heavyweight and the performance will drop. + * Under heavy load, checkpoints can be delayed if a transaction takes longer than the specified + * checkpointing interval. + * + * <p>If publish options are used and the checkpointing mode is at-least-once or exactly-once, they + * require a {@link DeserializationSchema} to be provided because messages that were persisted as + * part of an earlier checkpoint are needed to recompute routing/exchange. + */ +public class RabbitMQSink<T> implements Sink<T, Void, RabbitMQSinkWriterState<T>, Void> { + + private final RabbitMQConnectionConfig connectionConfig; + private final String queueName; + private final SerializationSchema<T> serializationSchema; + private final RabbitMQSinkPublishOptions<T> publishOptions; + private final ConsistencyMode consistencyMode; + private final SerializableReturnListener returnListener; + + private static final ConsistencyMode DEFAULT_CONSISTENCY_MODE = ConsistencyMode.AT_MOST_ONCE; + + private RabbitMQSink( + RabbitMQConnectionConfig connectionConfig, + String queueName, + SerializationSchema<T> serializationSchema, + ConsistencyMode consistencyMode, + @Nullable SerializableReturnListener returnListener, + @Nullable RabbitMQSinkPublishOptions<T> publishOptions) { + this.connectionConfig = requireNonNull(connectionConfig); + this.queueName = requireNonNull(queueName); + this.serializationSchema = requireNonNull(serializationSchema); + this.consistencyMode = requireNonNull(consistencyMode); + + this.returnListener = returnListener; + + Preconditions.checkState( + verifyPublishOptions(), + "If consistency mode is stronger than at-most-once and publish options are defined" Review comment: ```suggestion "If consistency mode is stronger than at-most-once and publish options are defined " ``` ########## File path: flink-connectors/flink-connector-rabbitmq2/pom.xml ########## @@ -0,0 +1,84 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connectors</artifactId> + <version>1.13-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-connector-rabbitmq2_${scala.binary.version}</artifactId> + <name>Flink : Connectors : RabbitMQ2</name> + + <packaging>jar</packaging> + + <!-- Allow users to pass custom connector versions --> + <properties> + <rabbitmq.version>5.9.0</rabbitmq.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-base</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>com.rabbitmq</groupId> + <artifactId>amqp-client</artifactId> + <version>${rabbitmq.version}</version> + </dependency> + + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>rabbitmq</artifactId> + <version>1.15.1</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> Review comment: ```suggestion <scope>test</scope> <type>test-jar</type> ``` ########## File path: flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/source/RabbitMQSource.java ########## @@ -0,0 +1,310 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq2.source; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.Source; +import org.apache.flink.api.connector.source.SourceReader; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.connector.rabbitmq2.common.ConsistencyMode; +import org.apache.flink.connector.rabbitmq2.common.RabbitMQConnectionConfig; +import org.apache.flink.connector.rabbitmq2.source.enumerator.RabbitMQSourceEnumState; +import org.apache.flink.connector.rabbitmq2.source.enumerator.RabbitMQSourceEnumStateSerializer; +import org.apache.flink.connector.rabbitmq2.source.enumerator.RabbitMQSourceEnumerator; +import org.apache.flink.connector.rabbitmq2.source.reader.RabbitMQSourceReaderBase; +import org.apache.flink.connector.rabbitmq2.source.reader.specialized.RabbitMQSourceReaderAtLeastOnce; +import org.apache.flink.connector.rabbitmq2.source.reader.specialized.RabbitMQSourceReaderAtMostOnce; +import org.apache.flink.connector.rabbitmq2.source.reader.specialized.RabbitMQSourceReaderExactlyOnce; +import org.apache.flink.connector.rabbitmq2.source.split.RabbitMQSourceSplit; +import org.apache.flink.connector.rabbitmq2.source.split.RabbitMQSourceSplitSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static java.util.Objects.requireNonNull; + +/** + * RabbitMQ source (consumer) that consumes messages from a RabbitMQ queue. It provides + * at-most-once, at-least-once and exactly-once processing semantics. For at-least-once and + * exactly-once, checkpointing needs to be enabled. The source operates as a StreamingSource and + * thus works in a streaming fashion. Please use a {@link RabbitMQSourceBuilder} to construct the + * source. The following example shows how to create a RabbitMQSource emitting records of <code> + * String</code> type. + * + * <pre>{@code + * RabbitMQSource<String> source = RabbitMQSource + * .<String>builder() + * .setConnectionConfig(MY_RMQ_CONNECTION_CONFIG) + * .setQueueName("myQueue") + * .setDeliveryDeserializer(new SimpleStringSchema()) + * .setConsistencyMode(MY_CONSISTENCY_MODE) + * .build(); + * }</pre> + * + * <p>When creating the source a {@code connectionConfig} must be specified via {@link + * RabbitMQConnectionConfig}. It contains required information for the RabbitMQ java client to + * connect to the RabbitMQ server. A minimum configuration contains a (virtual) host, a username, a + * password and a port. Besides that, the {@code queueName} to consume from and a {@link + * DeserializationSchema} + * + * <p>When using at-most-once consistency, messages are automatically acknowledged when received + * from RabbitMQ and later consumed by the output. In case of a failure, messages might be lost. + * More details in {@link RabbitMQSourceReaderAtMostOnce}. + * + * <p>In case of at-least-once consistency, message are buffered and later consumed by the output. + * Once a checkpoint is finished, the messages that were consumed by the output are acknowledged to + * RabbitMQ. This way, we ensure that the messages are successfully received by the output. In case + * of a system failure, the message that were acknowledged to RabbitMQ will be resend by RabbitMQ. + * More details in {@link RabbitMQSourceReaderAtLeastOnce}. + * + * <p>To ensure exactly-once consistency, messages are deduplicated through {@code correlationIds}. + * Similar to at-least-once consistency, we store the {@code deliveryTags} of the messages that are + * consumed by the output to acknowledge them later. A transactional RabbitMQ channel is used to + * ensure that all messages are successfully acknowledged to RabbitMQ. More details in {@link + * RabbitMQSourceReaderExactlyOnce}. + * + * <p>Keep in mind that the transactional channels are heavyweight and performance will drop. Under + * heavy load, checkpoints can be delayed if a transaction takes longer than the specified + * checkpointing interval. + * + * @param <T> the output type of the source. + */ +public class RabbitMQSource<T> + implements Source<T, RabbitMQSourceSplit, RabbitMQSourceEnumState>, ResultTypeQueryable<T> { + private static final Logger LOG = LoggerFactory.getLogger(RabbitMQSource.class); + + private final RabbitMQConnectionConfig connectionConfig; + private final String queueName; + private final DeserializationSchema<T> deserializationSchema; + private final ConsistencyMode consistencyMode; + + private RabbitMQSource( + RabbitMQConnectionConfig connectionConfig, + String queueName, + DeserializationSchema<T> deserializationSchema, + ConsistencyMode consistencyMode) { + this.connectionConfig = requireNonNull(connectionConfig); + this.queueName = requireNonNull(queueName); + this.deserializationSchema = requireNonNull(deserializationSchema); + this.consistencyMode = requireNonNull(consistencyMode); + + LOG.info("Create RabbitMQ source"); + } + + /** + * Get a {@link RabbitMQSourceBuilder} for the source. + * + * @param <T> type of the source. + * @return a source builder + * @see RabbitMQSourceBuilder + */ + public static <T> RabbitMQSourceBuilder<T> builder() { + return new RabbitMQSourceBuilder<>(); + } + + /** + * The boundedness is always continuous unbounded as this is a streaming-only source. + * + * @return Boundedness continuous unbounded. + * @see Boundedness + */ + @Override + public Boundedness getBoundedness() { + return Boundedness.CONTINUOUS_UNBOUNDED; + } + + /** + * Returns a new initialized source reader of the source's consistency mode. + * + * @param sourceReaderContext context which the reader will be created in. + * @return RabbitMQSourceReader a source reader of the specified consistency type. + * @see RabbitMQSourceReaderBase + */ + @Override + public SourceReader<T, RabbitMQSourceSplit> createReader( + SourceReaderContext sourceReaderContext) { + LOG.info("New Source Reader of type " + consistencyMode + " requested."); Review comment: ```suggestion LOG.info("New Source Reader of type {} requested.", consistencyMode); ``` ########## File path: flink-connectors/flink-connector-rabbitmq2/pom.xml ########## @@ -0,0 +1,84 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connectors</artifactId> + <version>1.13-SNAPSHOT</version> Review comment: ```suggestion <version>1.15-SNAPSHOT</version> ``` ########## File path: flink-connectors/flink-connector-rabbitmq2/README.md ########## @@ -0,0 +1,24 @@ +# License of the Rabbit MQ Connector Review comment: nit: What about merging `README.md` of RMQSource and `README.md` of RMQSink into this `RMQSource` ? ########## File path: flink-connectors/flink-connector-rabbitmq2/src/test/java/org/apache/flink/connector/rabbitmq2/sink/RabbitMQSinkITCase.java ########## @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq2.sink; + +import org.apache.flink.connector.rabbitmq2.common.ConsistencyMode; +import org.apache.flink.connector.rabbitmq2.common.RabbitMQBaseTest; +import org.apache.flink.connector.rabbitmq2.common.RabbitMQContainerClient; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import org.junit.Before; +import org.junit.Test; + +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * The tests for the RabbitMQ sink with different consistency modes. As the tests are working a lot + * with timeouts to uphold stream it is possible that tests might fail. + */ +public class RabbitMQSinkITCase extends RabbitMQBaseTest { + + private static AtomicBoolean shouldFail; + + @Before + public void setup() { + shouldFail = new AtomicBoolean(true); + } + + private class GeneratorFailureSource implements SourceFunction<String> { Review comment: ```suggestion private static class GeneratorFailureSource implements SourceFunction<String> { ``` ########## File path: flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/source/enumerator/RabbitMQSourceEnumerator.java ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq2.source.enumerator; + +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.connector.rabbitmq2.common.ConsistencyMode; +import org.apache.flink.connector.rabbitmq2.common.RabbitMQConnectionConfig; +import org.apache.flink.connector.rabbitmq2.source.split.RabbitMQSourceSplit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.List; + +import static java.util.Objects.requireNonNull; + +/** + * The source enumerator provides the source readers with the split. All source readers receive the + * same split as it only contains information about the connection and in case of exactly-once, the + * seen correlation ids. But in this case, the enumerator makes sure that at maximum one source + * reader receives the split. During exactly-once if multiple reader should be assigned a split a + * {@link RuntimeException} is thrown. + */ +public class RabbitMQSourceEnumerator + implements SplitEnumerator<RabbitMQSourceSplit, RabbitMQSourceEnumState> { + private final SplitEnumeratorContext<RabbitMQSourceSplit> context; + private final ConsistencyMode consistencyMode; + private static final Logger LOG = LoggerFactory.getLogger(RabbitMQSourceEnumerator.class); + private RabbitMQSourceSplit split; + + public RabbitMQSourceEnumerator( + SplitEnumeratorContext<RabbitMQSourceSplit> context, + ConsistencyMode consistencyMode, + RabbitMQConnectionConfig connectionConfig, + String rmqQueueName, + RabbitMQSourceEnumState enumState) { + // The enumState is not used since the enumerator has no state in this architecture. + this(context, consistencyMode, connectionConfig, rmqQueueName); + } + + public RabbitMQSourceEnumerator( + SplitEnumeratorContext<RabbitMQSourceSplit> context, + ConsistencyMode consistencyMode, + RabbitMQConnectionConfig connectionConfig, + String rmqQueueName) { + this.context = requireNonNull(context); + this.consistencyMode = requireNonNull(consistencyMode); + this.split = new RabbitMQSourceSplit(connectionConfig, rmqQueueName); + } + + @Override + public void start() { + LOG.info("Start RabbitMQ source enumerator"); + } + + @Override + public void handleSplitRequest(int i, @Nullable String s) { + LOG.info("Split request from reader " + i); + assignSplitToReader(i, split); + } + + @Override + public void addSplitsBack(List<RabbitMQSourceSplit> list, int i) { + if (list.size() == 0) { + return; + } + + // Every Source Reader will only receive one split, thus we will never get back more. + if (list.size() != 1) { + throw new RuntimeException( + "There should only be one split added back at a time. per reader"); + } + + LOG.info("Split returned from reader " + i); + // In case of exactly-once (parallelism 1) the single split gets updated with the + // correlation ids and in case of a recovery we have to store this split until we can + // assign it to the recovered reader. + split = list.get(0); + } + + /** + * In the case of exactly-once multiple readers are not allowed. + * + * @see RabbitMQSourceEnumerator#assignSplitToReader(int, RabbitMQSourceSplit) + * @param i reader id + */ + @Override + public void addReader(int i) {} + + /** @return empty enum state object */ + @Override + public RabbitMQSourceEnumState snapshotState() { Review comment: ```suggestion public RabbitMQSourceEnumState snapshotState(long checkpointId) throws Exception { ``` ########## File path: flink-connectors/flink-connector-rabbitmq2/pom.xml ########## @@ -0,0 +1,84 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connectors</artifactId> + <version>1.13-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-connector-rabbitmq2_${scala.binary.version}</artifactId> + <name>Flink : Connectors : RabbitMQ2</name> + + <packaging>jar</packaging> + + <!-- Allow users to pass custom connector versions --> + <properties> + <rabbitmq.version>5.9.0</rabbitmq.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-base</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>com.rabbitmq</groupId> + <artifactId>amqp-client</artifactId> + <version>${rabbitmq.version}</version> + </dependency> + + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>rabbitmq</artifactId> + <version>1.15.1</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> Review comment: ```suggestion <artifactId>flink-streaming-java</artifactId> ``` ########## File path: flink-connectors/flink-connector-rabbitmq2/src/main/java/org/apache/flink/connector/rabbitmq2/source/enumerator/RabbitMQSourceEnumerator.java ########## @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.rabbitmq2.source.enumerator; + +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.SplitsAssignment; +import org.apache.flink.connector.rabbitmq2.common.ConsistencyMode; +import org.apache.flink.connector.rabbitmq2.common.RabbitMQConnectionConfig; +import org.apache.flink.connector.rabbitmq2.source.split.RabbitMQSourceSplit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.List; + +import static java.util.Objects.requireNonNull; + +/** + * The source enumerator provides the source readers with the split. All source readers receive the + * same split as it only contains information about the connection and in case of exactly-once, the + * seen correlation ids. But in this case, the enumerator makes sure that at maximum one source + * reader receives the split. During exactly-once if multiple reader should be assigned a split a + * {@link RuntimeException} is thrown. + */ +public class RabbitMQSourceEnumerator + implements SplitEnumerator<RabbitMQSourceSplit, RabbitMQSourceEnumState> { + private final SplitEnumeratorContext<RabbitMQSourceSplit> context; + private final ConsistencyMode consistencyMode; + private static final Logger LOG = LoggerFactory.getLogger(RabbitMQSourceEnumerator.class); + private RabbitMQSourceSplit split; + + public RabbitMQSourceEnumerator( + SplitEnumeratorContext<RabbitMQSourceSplit> context, + ConsistencyMode consistencyMode, + RabbitMQConnectionConfig connectionConfig, + String rmqQueueName, + RabbitMQSourceEnumState enumState) { + // The enumState is not used since the enumerator has no state in this architecture. + this(context, consistencyMode, connectionConfig, rmqQueueName); + } + + public RabbitMQSourceEnumerator( + SplitEnumeratorContext<RabbitMQSourceSplit> context, + ConsistencyMode consistencyMode, + RabbitMQConnectionConfig connectionConfig, + String rmqQueueName) { + this.context = requireNonNull(context); + this.consistencyMode = requireNonNull(consistencyMode); + this.split = new RabbitMQSourceSplit(connectionConfig, rmqQueueName); + } + + @Override + public void start() { + LOG.info("Start RabbitMQ source enumerator"); + } + + @Override + public void handleSplitRequest(int i, @Nullable String s) { + LOG.info("Split request from reader " + i); + assignSplitToReader(i, split); + } + + @Override + public void addSplitsBack(List<RabbitMQSourceSplit> list, int i) { + if (list.size() == 0) { + return; + } + + // Every Source Reader will only receive one split, thus we will never get back more. + if (list.size() != 1) { + throw new RuntimeException( + "There should only be one split added back at a time. per reader"); + } + + LOG.info("Split returned from reader " + i); Review comment: ```suggestion LOG.info("Split returned from reader {}.", i); ``` ########## File path: flink-connectors/flink-connector-rabbitmq2/pom.xml ########## @@ -0,0 +1,84 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connectors</artifactId> + <version>1.13-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-connector-rabbitmq2_${scala.binary.version}</artifactId> + <name>Flink : Connectors : RabbitMQ2</name> + + <packaging>jar</packaging> + + <!-- Allow users to pass custom connector versions --> + <properties> + <rabbitmq.version>5.9.0</rabbitmq.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> Review comment: ```suggestion <artifactId>flink-streaming-java</artifactId> ``` ########## File path: flink-connectors/flink-connector-rabbitmq2/pom.xml ########## @@ -0,0 +1,84 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connectors</artifactId> + <version>1.13-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-connector-rabbitmq2_${scala.binary.version}</artifactId> + <name>Flink : Connectors : RabbitMQ2</name> + + <packaging>jar</packaging> + + <!-- Allow users to pass custom connector versions --> + <properties> + <rabbitmq.version>5.9.0</rabbitmq.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-base</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>com.rabbitmq</groupId> + <artifactId>amqp-client</artifactId> + <version>${rabbitmq.version}</version> + </dependency> + + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>rabbitmq</artifactId> + <version>1.15.1</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils_${scala.binary.version}</artifactId> Review comment: ```suggestion <artifactId>flink-test-utils</artifactId> ``` ########## File path: flink-connectors/flink-connector-rabbitmq2/pom.xml ########## @@ -0,0 +1,84 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connectors</artifactId> + <version>1.13-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>flink-connector-rabbitmq2_${scala.binary.version}</artifactId> Review comment: ```suggestion <artifactId>flink-connector-rabbitmq2</artifactId> ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org