This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 76613a0ed4 NIFI-6730 AMQP QoS support
76613a0ed4 is described below
commit 76613a0ed4a90c5e264e0537990278ec9e422536
Author: Mikhail Sapozhnikov <[email protected]>
AuthorDate: Fri Dec 8 14:38:14 2023 +0300
NIFI-6730 AMQP QoS support
This closes #8146.
Signed-off-by: Peter Turcsanyi <[email protected]>
---
.../apache/nifi/amqp/processors/AMQPConsumer.java | 4 +++-
.../apache/nifi/amqp/processors/ConsumeAMQP.java | 15 +++++++++++-
.../nifi/amqp/processors/AMQPConsumerTest.java | 28 +++++++++++++++-------
.../nifi/amqp/processors/ConsumeAMQPTest.java | 4 +++-
.../apache/nifi/amqp/processors/TestChannel.java | 6 ++++-
5 files changed, 45 insertions(+), 12 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java
index e11044845c..04f951836a 100644
---
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java
+++
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java
@@ -43,7 +43,8 @@ final class AMQPConsumer extends AMQPWorker {
private final boolean autoAcknowledge;
private final Consumer consumer;
- AMQPConsumer(final Connection connection, final String queueName, final
boolean autoAcknowledge, ComponentLog processorLog) throws IOException {
+ AMQPConsumer(final Connection connection, final String queueName, final
boolean autoAcknowledge, final int prefetchCount,
+ ComponentLog processorLog) throws IOException {
super(connection, processorLog);
this.validateStringProperty("queueName", queueName);
this.queueName = queueName;
@@ -80,6 +81,7 @@ final class AMQPConsumer extends AMQPWorker {
}
};
+ channel.basicQos(prefetchCount);
channel.basicConsume(queueName, autoAcknowledge, consumer);
}
diff --git
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
index 23552d6430..6d4e5d01a6 100644
---
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
+++
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
@@ -112,6 +112,17 @@ public class ConsumeAMQP extends
AbstractAMQPProcessor<AMQPConsumer> {
.defaultValue("10")
.required(true)
.build();
+ static final PropertyDescriptor PREFETCH_COUNT = new
PropertyDescriptor.Builder()
+ .name("prefetch.count")
+ .displayName("Prefetch Count")
+ .description("The maximum number of unacknowledged messages for the
consumer. If consumer has this number of unacknowledged messages, AMQP broker
will "
+ + "no longer send new messages until consumer acknowledges some
of the messages already delivered to it."
+ + "Allowed values: from 0 to 65535. 0 means no limit")
+ .addValidator(StandardValidators.createLongValidator(0, 65535, true))
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .defaultValue("0")
+ .required(true)
+ .build();
public static final PropertyDescriptor HEADER_FORMAT = new
PropertyDescriptor.Builder()
.name("header.format")
@@ -167,6 +178,7 @@ public class ConsumeAMQP extends
AbstractAMQPProcessor<AMQPConsumer> {
properties.add(QUEUE);
properties.add(AUTO_ACKNOWLEDGE);
properties.add(BATCH_SIZE);
+ properties.add(PREFETCH_COUNT);
properties.add(HEADER_FORMAT);
properties.add(HEADER_KEY_PREFIX);
properties.add(HEADER_SEPARATOR);
@@ -301,7 +313,8 @@ public class ConsumeAMQP extends
AbstractAMQPProcessor<AMQPConsumer> {
try {
final String queueName = context.getProperty(QUEUE).getValue();
final boolean autoAcknowledge =
context.getProperty(AUTO_ACKNOWLEDGE).asBoolean();
- final AMQPConsumer amqpConsumer = new AMQPConsumer(connection,
queueName, autoAcknowledge, getLogger());
+ final int prefetchCount =
context.getProperty(PREFETCH_COUNT).asInteger();
+ final AMQPConsumer amqpConsumer = new AMQPConsumer(connection,
queueName, autoAcknowledge, prefetchCount, getLogger());
return amqpConsumer;
} catch (final IOException ioe) {
diff --git
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java
index e4d7b1d162..195bd7adc4 100644
---
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java
+++
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java
@@ -42,6 +42,7 @@ import org.junit.jupiter.api.Test;
public class AMQPConsumerTest {
+ private static final int DEFAULT_PREFETCH_COUNT = 0;
private ComponentLog processorLog;
@BeforeEach
@@ -55,7 +56,7 @@ public class AMQPConsumerTest {
final Map<String, String> exchangeToRoutingKeymap =
Collections.singletonMap("myExchange", "key1");
final TestConnection connection = new
TestConnection(exchangeToRoutingKeymap, routingMap);
- final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1",
true, processorLog);
+ final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1",
true, DEFAULT_PREFETCH_COUNT, processorLog);
consumer.getChannel().basicPublish("myExchange", "key1", new
BasicProperties(), new byte[0]);
consumer.close();
@@ -69,7 +70,7 @@ public class AMQPConsumerTest {
final Map<String, String> exchangeToRoutingKeymap =
Collections.singletonMap("myExchange", "key1");
final TestConnection connection = new
TestConnection(exchangeToRoutingKeymap, routingMap);
- final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1",
true, processorLog);
+ final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1",
true, DEFAULT_PREFETCH_COUNT, processorLog);
assertFalse(consumer.closed);
@@ -80,14 +81,14 @@ public class AMQPConsumerTest {
@Test
public void failOnNullConnection() {
- assertThrows(IllegalArgumentException.class, () -> new
AMQPConsumer(null, null, true, processorLog));
+ assertThrows(IllegalArgumentException.class, () -> new
AMQPConsumer(null, null, true, DEFAULT_PREFETCH_COUNT, processorLog));
}
@Test
public void failOnNullQueueName() {
assertThrows(IllegalArgumentException.class, () -> {
Connection conn = new TestConnection(null, null);
- new AMQPConsumer(conn, null, true, processorLog);
+ new AMQPConsumer(conn, null, true, DEFAULT_PREFETCH_COUNT,
processorLog);
});
}
@@ -95,7 +96,7 @@ public class AMQPConsumerTest {
public void failOnEmptyQueueName() {
assertThrows(IllegalArgumentException.class, () -> {
Connection conn = new TestConnection(null, null);
- new AMQPConsumer(conn, " ", true, processorLog);
+ new AMQPConsumer(conn, " ", true, DEFAULT_PREFETCH_COUNT,
processorLog);
});
}
@@ -103,7 +104,7 @@ public class AMQPConsumerTest {
public void failOnNonExistingQueue() {
assertThrows(IOException.class, () -> {
Connection conn = new TestConnection(null, null);
- try (AMQPConsumer consumer = new AMQPConsumer(conn, "hello", true,
processorLog)) {
+ try (AMQPConsumer consumer = new AMQPConsumer(conn, "hello", true,
DEFAULT_PREFETCH_COUNT, processorLog)) {
consumer.consume();
}
});
@@ -117,7 +118,7 @@ public class AMQPConsumerTest {
exchangeToRoutingKeymap.put("", "queue1");
Connection conn = new TestConnection(exchangeToRoutingKeymap,
routingMap);
- try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true,
processorLog)) {
+ try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true,
DEFAULT_PREFETCH_COUNT, processorLog)) {
GetResponse response = consumer.consume();
assertNull(response);
}
@@ -132,9 +133,20 @@ public class AMQPConsumerTest {
Connection conn = new TestConnection(exchangeToRoutingKeymap,
routingMap);
conn.createChannel().basicPublish("myExchange", "key1", null, "hello
Joe".getBytes());
- try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true,
processorLog)) {
+ try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true,
DEFAULT_PREFETCH_COUNT, processorLog)) {
GetResponse response = consumer.consume();
assertNotNull(response);
}
}
+
+ @Test
+ public void validatePrefetchSet() throws Exception {
+ final Map<String, List<String>> routingMap =
Collections.singletonMap("key1", Arrays.asList("queue1"));
+ final Map<String, String> exchangeToRoutingKeymap =
Collections.singletonMap("myExchange", "key1");
+ Connection conn = new TestConnection(exchangeToRoutingKeymap,
routingMap);
+ try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true,
100, processorLog)) {
+ TestChannel channel = (TestChannel)consumer.getChannel();
+ assertEquals(100, channel.getPrefetchCount());
+ }
+ }
}
diff --git
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
index 0655caaa21..fce2c260a1 100644
---
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
+++
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
@@ -393,7 +393,9 @@ public class ConsumeAMQPTest {
throw new IllegalStateException("Consumer already
created");
}
- consumer = new AMQPConsumer(connection,
context.getProperty(ConsumeAMQP.QUEUE).getValue(),
context.getProperty(ConsumeAMQP.AUTO_ACKNOWLEDGE).asBoolean(), getLogger());
+ consumer = new AMQPConsumer(connection,
context.getProperty(ConsumeAMQP.QUEUE).getValue(),
+
context.getProperty(ConsumeAMQP.AUTO_ACKNOWLEDGE).asBoolean(),
context.getProperty(ConsumeAMQP.PREFETCH_COUNT).asInteger(),
+ getLogger());
return consumer;
} catch (IOException e) {
throw new ProcessException(e);
diff --git
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java
index c68ffeefd1..17b267eee1 100644
---
a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java
+++
b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java
@@ -77,6 +77,7 @@ class TestChannel implements Channel {
private long deliveryTag = 0L;
private final BitSet acknowledgments = new BitSet();
private final BitSet nacks = new BitSet();
+ private int prefetchCount = 0;
public TestChannel(Map<String, String> exchangeToRoutingKeyMappings,
Map<String, List<String>> routingKeyToQueueMappings) {
@@ -222,8 +223,11 @@ class TestChannel implements Channel {
@Override
public void basicQos(int prefetchCount) throws IOException {
- throw new UnsupportedOperationException("This method is not currently
supported as it is not used by current API in testing");
+ this.prefetchCount = prefetchCount;
+ }
+ public int getPrefetchCount() {
+ return this.prefetchCount;
}
@Override