fapaul commented on a change in pull request #16023:
URL: https://github.com/apache/flink/pull/16023#discussion_r644718318
##########
File path:
flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
##########
@@ -419,6 +421,40 @@ protected Connection setupConnection() throws Exception {
Mockito.verify(channel, Mockito.times(0)).basicQos(anyInt());
}
+ private static class CallsRealMethodsWithDelay extends CallsRealMethods {
Review comment:
Nit: Can you move this helper after all `@Test` methods?
##########
File path:
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
##########
@@ -506,16 +532,45 @@ public Builder setPrefetchCount(int prefetchCount) {
return this;
}
+ /**
+ * Enables setting the next message delivery timeout in the queueing
consumer. Only
Review comment:
```suggestion
* Enables setting the message delivery timeout in the queueing
consumer. Only
```
##########
File path:
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
##########
@@ -265,6 +279,16 @@ public Integer getRequestedHeartbeat() {
return Optional.ofNullable(prefetchCount);
}
+ /**
+ * Retrieve the next message delivery timeout used in the queueing
consumer. If not specified
+ * explicitly, the default value of 30000 milliseconds will be returned.
+ *
+ * @return the next message delivery timeout, in milliseconds; zero for
unlimited
+ */
+ public long getDeliveryTimeout() {
+ return
Optional.ofNullable(deliveryTimeout).orElse(DEFAULT_DELIVERY_TIMEOUT);
Review comment:
Nit: Can you move this call to the constructor that it is not evaluated
every time?
##########
File path:
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
##########
@@ -265,6 +279,16 @@ public Integer getRequestedHeartbeat() {
return Optional.ofNullable(prefetchCount);
}
+ /**
+ * Retrieve the next message delivery timeout used in the queueing
consumer. If not specified
Review comment:
Next feels a bit misplaced because the delivery timeout is somewhat
static and not reconfigurable.
```suggestion
* Retrieve the message delivery timeout used in the queueing consumer.
If not specified
```
##########
File path:
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/common/RMQConnectionConfig.java
##########
@@ -90,12 +94,16 @@ private RMQConnectionConfig(
Integer requestedChannelMax,
Integer requestedFrameMax,
Integer requestedHeartbeat,
- Integer prefetchCount) {
+ Integer prefetchCount,
+ Long deliveryTimeout) {
Preconditions.checkNotNull(host, "host can not be null");
Preconditions.checkNotNull(port, "port can not be null");
Preconditions.checkNotNull(virtualHost, "virtualHost can not be null");
Preconditions.checkNotNull(username, "username can not be null");
Preconditions.checkNotNull(password, "password can not be null");
+ Preconditions.checkArgument(
+ deliveryTimeout == null || deliveryTimeout >= 0,
+ "deliveryTimeout can not be negative");
Review comment:
Thanks for the clarification
##########
File path:
flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java
##########
@@ -419,6 +426,48 @@ protected Connection setupConnection() throws Exception {
Mockito.verify(channel, Mockito.times(0)).basicQos(anyInt());
}
+ @Test
+ public void testDeliveryTimeout() throws Exception {
+ source.autoAck = false;
Review comment:
I think with the current setup we have for the RabbitMQ source it is
very tough to test the added functionality. We would need to access the
underlying `queue` in the Consumer to mock the behavior.
My suggestion would be to change this test slightly to ensure
`consumer.nextDelivery()` is never called.
##########
File path:
flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java
##########
@@ -325,11 +325,15 @@ private void processMessage(Delivery delivery,
RMQCollectorImpl collector) throw
@Override
public void run(SourceContext<OUT> ctx) throws Exception {
final RMQCollectorImpl collector = new RMQCollectorImpl(ctx);
+ final long timeout = rmqConnectionConfig.getDeliveryTimeout();
while (running) {
- Delivery delivery = consumer.nextDelivery();
+ Delivery delivery =
+ timeout > 0 ? consumer.nextDelivery(timeout) :
consumer.nextDelivery();
Review comment:
Can the `timeout` actually be below zero? In general, I would not allow
calling `consumer.nextDelivery()` at all because it prevents the job from
making progress.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]