fapaul commented on a change in pull request #16023:
URL: https://github.com/apache/flink/pull/16023#discussion_r655123048
##########
File path:
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
##########
@@ -325,11 +325,15 @@ private void processMessage(Delivery delivery,
RMQCollectorImpl collector) throw
@Override
public void run(SourceContext<OUT> ctx) throws Exception {
final RMQCollectorImpl collector = new RMQCollectorImpl(ctx);
+ final long timeout = rmqConnectionConfig.getDeliveryTimeout();
while (running) {
- Delivery delivery = consumer.nextDelivery();
+ Delivery delivery =
+ timeout > 0 ? consumer.nextDelivery(timeout) :
consumer.nextDelivery();
Review comment:
IMO we should change the validation slightly to always require a value
greater than 0. I assume that most RabbitMQ uses cases are not very
performance-critical and can easily take a small penalty.
Preventing the bug from happening is the highest priority.
##########
File path:
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
##########
@@ -132,8 +142,12 @@ private RMQConnectionConfig(
Integer requestedChannelMax,
Integer requestedFrameMax,
Integer requestedHeartbeat,
- Integer prefetchCount) {
+ Integer prefetchCount,
+ Long deliveryTimeout) {
Preconditions.checkNotNull(uri, "Uri can not be null");
+ Preconditions.checkArgument(
+ deliveryTimeout == null || deliveryTimeout >= 0,
Review comment:
```suggestion
deliveryTimeout == null || deliveryTimeout > 0,
```
##########
File path:
flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
##########
@@ -419,6 +426,48 @@ protected Connection setupConnection() throws Exception {
Mockito.verify(channel, Mockito.times(0)).basicQos(anyInt());
}
+ @Test
+ public void testDeliveryTimeout() throws Exception {
+ source.autoAck = false;
Review comment:
Thanks, and I am a bit sorry for the missed styling check but you've
already fixed it 👍
##########
File path:
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
##########
@@ -506,16 +534,45 @@ public Builder setPrefetchCount(int prefetchCount) {
return this;
}
+ /**
+ * Enables setting the message delivery timeout in the queueing
consumer. Only applicable to
+ * the {@link RMQSource}. Set to 0 for unlimited (the consumer will be
blocked until an
+ * element becomes available). Default is 30000.
+ *
+ * @param deliveryTimeout maximum wait time, in milliseconds, for the
next message delivery
+ * @return the Builder
+ */
+ public Builder setDeliveryTimeout(long deliveryTimeout) {
+ Preconditions.checkArgument(
+ deliveryTimeout >= 0, "deliveryTimeout can not be
negative");
Review comment:
```suggestion
deliveryTimeout > 0, "deliveryTimeout can not be
negative");
```
##########
File path:
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
##########
@@ -90,12 +94,16 @@ private RMQConnectionConfig(
Integer requestedChannelMax,
Integer requestedFrameMax,
Integer requestedHeartbeat,
- Integer prefetchCount) {
+ Integer prefetchCount,
+ Long deliveryTimeout) {
Preconditions.checkNotNull(host, "host can not be null");
Preconditions.checkNotNull(port, "port can not be null");
Preconditions.checkNotNull(virtualHost, "virtualHost can not be null");
Preconditions.checkNotNull(username, "username can not be null");
Preconditions.checkNotNull(password, "password can not be null");
+ Preconditions.checkArgument(
+ deliveryTimeout == null || deliveryTimeout >= 0,
Review comment:
As a followup to not allow shooting themself in the foot.
```suggestion
deliveryTimeout == null || deliveryTimeout > 0,
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]