This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 432ba87 NIFI-5921 - Timeout property for ConsumeJMS
432ba87 is described below
commit 432ba8787f99848d939596aad73f10315b80bf12
Author: Pierre Villard <[email protected]>
AuthorDate: Fri Jan 4 10:51:19 2019 +0100
NIFI-5921 - Timeout property for ConsumeJMS
Signed-off-by: Pierre Villard <[email protected]>
This closes #3245.
---
.../apache/nifi/jms/processors/AbstractJMSProcessor.java | 3 ---
.../java/org/apache/nifi/jms/processors/ConsumeJMS.java | 9 ++++++---
.../java/org/apache/nifi/jms/processors/JMSConsumer.java | 4 ++--
.../apache/nifi/jms/processors/JMSPublisherConsumerIT.java | 14 +++++++-------
4 files changed, 15 insertions(+), 15 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
index 95a7103..0094eaf 100644
---
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
+++
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
@@ -221,9 +221,6 @@ abstract class AbstractJMSProcessor<T extends JMSWorker>
extends AbstractProcess
jmsTemplate.setConnectionFactory(cachingFactory);
jmsTemplate.setPubSubDomain(TOPIC.equals(context.getProperty(DESTINATION_TYPE).getValue()));
- // set of properties that may be good candidates for exposure via
configuration
- jmsTemplate.setReceiveTimeout(1000);
-
return finishBuildingJmsWorker(cachingFactory, jmsTemplate, context);
}
}
diff --git
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
index 1f8358e..997c6dd 100644
---
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
+++
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
@@ -134,7 +134,7 @@ public class ConsumeJMS extends
AbstractJMSProcessor<JMSConsumer> {
.description("How long to wait to consume a message from the
remote broker before giving up.")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .defaultValue("0 sec")
+ .defaultValue("1 sec")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
@@ -187,9 +187,8 @@ public class ConsumeJMS extends
AbstractJMSProcessor<JMSConsumer> {
final boolean shared = sharedBoolean == null ? false : sharedBoolean;
final String subscriptionName =
context.getProperty(SUBSCRIPTION_NAME).evaluateAttributeExpressions().getValue();
final String charset =
context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
- final long timeout =
context.getProperty(TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
- consumer.consume(destinationName, durable, shared, subscriptionName,
charset, timeout, new ConsumerCallback() {
+ consumer.consume(destinationName, durable, shared, subscriptionName,
charset, new ConsumerCallback() {
@Override
public void accept(final JMSResponse response) {
if (response == null) {
@@ -220,6 +219,10 @@ public class ConsumeJMS extends
AbstractJMSProcessor<JMSConsumer> {
protected JMSConsumer finishBuildingJmsWorker(CachingConnectionFactory
connectionFactory, JmsTemplate jmsTemplate, ProcessContext processContext) {
int ackMode =
processContext.getProperty(ACKNOWLEDGEMENT_MODE).asInteger();
jmsTemplate.setSessionAcknowledgeMode(ackMode);
+
+ long timeout =
processContext.getProperty(TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
+ jmsTemplate.setReceiveTimeout(timeout);
+
return new JMSConsumer(connectionFactory, jmsTemplate,
this.getLogger());
}
diff --git
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
index e6e5913..a2c73b4 100644
---
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
+++
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
@@ -80,7 +80,7 @@ final class JMSConsumer extends JMSWorker {
}
- public void consume(final String destinationName, final boolean durable,
final boolean shared, final String subscriberName, final String charset, final
long timeout,
+ public void consume(final String destinationName, final boolean durable,
final boolean shared, final String subscriberName, final String charset,
final ConsumerCallback consumerCallback) {
this.jmsTemplate.execute(new SessionCallback<Void>() {
@Override
@@ -88,7 +88,7 @@ final class JMSConsumer extends JMSWorker {
final MessageConsumer msgConsumer =
createMessageConsumer(session, destinationName, durable, shared,
subscriberName);
try {
- final Message message = msgConsumer.receive(timeout);
+ final Message message =
msgConsumer.receive(JMSConsumer.this.jmsTemplate.getReceiveTimeout());
JMSResponse response = null;
if (message != null) {
diff --git
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
index eed9276..7812e71 100644
---
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
+++
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
@@ -113,7 +113,7 @@ public class JMSPublisherConsumerIT {
});
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory)
jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
- consumer.consume(destinationName, false, false, null, "UTF-8",
1000, new ConsumerCallback() {
+ consumer.consume(destinationName, false, false, null, "UTF-8", new
ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
// noop
@@ -143,7 +143,7 @@ public class JMSPublisherConsumerIT {
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory)
jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
final AtomicBoolean callbackInvoked = new AtomicBoolean();
- consumer.consume(destinationName, false, false, null, "UTF-8",
1000, new ConsumerCallback() {
+ consumer.consume(destinationName, false, false, null, "UTF-8", new
ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
callbackInvoked.set(true);
@@ -190,7 +190,7 @@ public class JMSPublisherConsumerIT {
JMSConsumer consumer = new
JMSConsumer((CachingConnectionFactory) consumeTemplate.getConnectionFactory(),
consumeTemplate, mock(ComponentLog.class));
for (int j = 0; j < 1000 && msgCount.get() < 4000;
j++) {
- consumer.consume(destinationName, false, false,
null, "UTF-8", 1000, callback);
+ consumer.consume(destinationName, false, false,
null, "UTF-8", callback);
}
} finally {
((CachingConnectionFactory)
consumeTemplate.getConnectionFactory()).destroy();
@@ -229,7 +229,7 @@ public class JMSPublisherConsumerIT {
JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory)
jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
final AtomicBoolean callbackInvoked = new AtomicBoolean();
try {
- consumer.consume(destinationName, false, false, null, "UTF-8",
1000, new ConsumerCallback() {
+ consumer.consume(destinationName, false, false, null, "UTF-8",
new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
callbackInvoked.set(true);
@@ -246,7 +246,7 @@ public class JMSPublisherConsumerIT {
// should receive the same message, but will process it
successfully
while (!callbackInvoked.get()) {
- consumer.consume(destinationName, false, false, null, "UTF-8",
1000, new ConsumerCallback() {
+ consumer.consume(destinationName, false, false, null, "UTF-8",
new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
if (response == null) {
@@ -265,7 +265,7 @@ public class JMSPublisherConsumerIT {
// receiving next message and fail again
try {
while (!callbackInvoked.get()) {
- consumer.consume(destinationName, false, false, null,
"UTF-8", 1000, new ConsumerCallback() {
+ consumer.consume(destinationName, false, false, null,
"UTF-8", new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
if (response == null) {
@@ -287,7 +287,7 @@ public class JMSPublisherConsumerIT {
// should receive the same message, but will process it
successfully
try {
while (!callbackInvoked.get()) {
- consumer.consume(destinationName, false, false, null,
"UTF-8", 1000, new ConsumerCallback() {
+ consumer.consume(destinationName, false, false, null,
"UTF-8", new ConsumerCallback() {
@Override
public void accept(JMSResponse response) {
if (response == null) {