CAMEL-10271: Fixed camel-jt400 consumer to be scheduled so it reads from the queue as the jt400 library is not event based.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/77e417c2 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/77e417c2 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/77e417c2 Branch: refs/heads/camel-2.17.x Commit: 77e417c2775b15f24cc6f47df7fbdfe7c4a436af Parents: 1f7abfc Author: Claus Ibsen <davscl...@apache.org> Authored: Mon Sep 5 18:22:28 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon Sep 5 18:23:18 2016 +0200 ---------------------------------------------------------------------- .../component/jt400/Jt400Configuration.java | 14 +++++++ .../component/jt400/Jt400DataQueueConsumer.java | 42 ++++++++++++++------ .../camel/component/jt400/Jt400Endpoint.java | 22 +++++----- 3 files changed, 54 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/77e417c2/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Configuration.java ---------------------------------------------------------------------- diff --git a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Configuration.java b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Configuration.java index b33a25d..9a2383f 100644 --- a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Configuration.java +++ b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Configuration.java @@ -113,6 +113,9 @@ public class Jt400Configuration { @UriParam private Integer[] outputFieldsLengthArray; + @UriParam(label = "consumer", defaultValue = "30000") + private int readTimeout = 30000; + public Jt400Configuration(String endpointUri, AS400ConnectionPool connectionPool) throws URISyntaxException { ObjectHelper.notNull(endpointUri, "endpointUri", this); ObjectHelper.notNull(connectionPool, "connectionPool", this); @@ -301,6 +304,17 @@ public class Jt400Configuration { this.outputFieldsLengthArray = outputFieldsLengthArray; } + public int getReadTimeout() { + return readTimeout; + } + + /** + * Timeout in millis the consumer will wait while trying to read a new message of the data queue. + */ + public void setReadTimeout(int readTimeout) { + this.readTimeout = readTimeout; + } + public void setOutputFieldsIdx(String outputFieldsIdx) { if (outputFieldsIdx != null) { String[] outputArray = outputFieldsIdx.split(","); http://git-wip-us.apache.org/repos/asf/camel/blob/77e417c2/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java index c3ff46e..6bf9788 100644 --- a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java +++ b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java @@ -22,16 +22,15 @@ import com.ibm.as400.access.DataQueueEntry; import com.ibm.as400.access.KeyedDataQueue; import com.ibm.as400.access.KeyedDataQueueEntry; import org.apache.camel.Exchange; +import org.apache.camel.Processor; import org.apache.camel.RuntimeCamelException; -import org.apache.camel.impl.PollingConsumerSupport; +import org.apache.camel.impl.ScheduledPollConsumer; /** - * {@link org.apache.camel.PollingConsumer} that polls a data queue for data + * A scheduled {@link org.apache.camel.Consumer} that polls a data queue for data */ -public class Jt400DataQueueConsumer extends PollingConsumerSupport { +public class Jt400DataQueueConsumer extends ScheduledPollConsumer { - private final Jt400Endpoint endpoint; - /** * Performs the lifecycle logic of this consumer. */ @@ -40,13 +39,28 @@ public class Jt400DataQueueConsumer extends PollingConsumerSupport { /** * Creates a new consumer instance */ - protected Jt400DataQueueConsumer(Jt400Endpoint endpoint) { - super(endpoint); - this.endpoint = endpoint; + public Jt400DataQueueConsumer(Jt400Endpoint endpoint, Processor processor) { + super(endpoint, processor); this.queueService = new Jt400DataQueueService(endpoint); } @Override + public Jt400Endpoint getEndpoint() { + return (Jt400Endpoint) super.getEndpoint(); + } + + @Override + protected int poll() throws Exception { + Exchange exchange = receive(getEndpoint().getReadTimeout()); + if (exchange != null) { + getProcessor().process(exchange); + return 1; + } else { + return 0; + } + } + + @Override protected void doStart() throws Exception { queueService.start(); } @@ -56,11 +70,13 @@ public class Jt400DataQueueConsumer extends PollingConsumerSupport { queueService.stop(); } + @Deprecated public Exchange receive() { // -1 to indicate a blocking read from data queue return receive(-1); } + @Deprecated public Exchange receiveNoWait() { return receive(0); } @@ -85,7 +101,7 @@ public class Jt400DataQueueConsumer extends PollingConsumerSupport { public Exchange receive(long timeout) { BaseDataQueue queue = queueService.getDataQueue(); try { - if (endpoint.isKeyed()) { + if (getEndpoint().isKeyed()) { return receive((KeyedDataQueue) queue, timeout); } else { return receive((DataQueue) queue, timeout); @@ -109,7 +125,7 @@ public class Jt400DataQueueConsumer extends PollingConsumerSupport { Exchange exchange = getEndpoint().createExchange(); if (entry != null) { exchange.getIn().setHeader(Jt400Endpoint.SENDER_INFORMATION, entry.getSenderInformation()); - if (endpoint.getFormat() == Jt400Configuration.Format.binary) { + if (getEndpoint().getFormat() == Jt400Configuration.Format.binary) { exchange.getIn().setBody(entry.getData()); } else { exchange.getIn().setBody(entry.getString()); @@ -120,8 +136,8 @@ public class Jt400DataQueueConsumer extends PollingConsumerSupport { } private Exchange receive(KeyedDataQueue queue, long timeout) throws Exception { - String key = endpoint.getSearchKey(); - String searchType = endpoint.getSearchType().name(); + String key = getEndpoint().getSearchKey(); + String searchType = getEndpoint().getSearchType().name(); KeyedDataQueueEntry entry; if (timeout >= 0) { int seconds = (int) timeout / 1000; @@ -135,7 +151,7 @@ public class Jt400DataQueueConsumer extends PollingConsumerSupport { Exchange exchange = getEndpoint().createExchange(); if (entry != null) { exchange.getIn().setHeader(Jt400Endpoint.SENDER_INFORMATION, entry.getSenderInformation()); - if (endpoint.getFormat() == Jt400Configuration.Format.binary) { + if (getEndpoint().getFormat() == Jt400Configuration.Format.binary) { exchange.getIn().setBody(entry.getData()); exchange.getIn().setHeader(Jt400Endpoint.KEY, entry.getKey()); } else { http://git-wip-us.apache.org/repos/asf/camel/blob/77e417c2/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Endpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Endpoint.java b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Endpoint.java index 849e683..e3ad785 100644 --- a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Endpoint.java +++ b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Endpoint.java @@ -24,10 +24,9 @@ import com.ibm.as400.access.AS400; import com.ibm.as400.access.AS400ConnectionPool; import org.apache.camel.CamelException; import org.apache.camel.Consumer; -import org.apache.camel.PollingConsumer; import org.apache.camel.Processor; import org.apache.camel.Producer; -import org.apache.camel.impl.DefaultEndpoint; +import org.apache.camel.impl.ScheduledPollEndpoint; import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; import org.apache.camel.util.ObjectHelper; @@ -37,7 +36,7 @@ import org.apache.camel.util.URISupport; * The jt400 component allows you to exchanges messages with an AS/400 system using data queues or program call. */ @UriEndpoint(scheme = "jt400", title = "JT400", syntax = "jt400:userID:password/systemName/objectPath.type", consumerClass = Jt400DataQueueConsumer.class, label = "messaging") -public class Jt400Endpoint extends DefaultEndpoint { +public class Jt400Endpoint extends ScheduledPollEndpoint { public static final String KEY = "KEY"; public static final String SENDER_INFORMATION = "SENDER_INFORMATION"; @@ -70,13 +69,6 @@ public class Jt400Endpoint extends DefaultEndpoint { } @Override - public PollingConsumer createPollingConsumer() throws Exception { - Jt400DataQueueConsumer answer = new Jt400DataQueueConsumer(this); - configurePollingConsumer(answer); - return answer; - } - - @Override public Producer createProducer() throws Exception { if (Jt400Type.DTAQ == configuration.getType()) { return new Jt400DataQueueProducer(this); @@ -88,7 +80,7 @@ public class Jt400Endpoint extends DefaultEndpoint { @Override public Consumer createConsumer(Processor processor) throws Exception { if (Jt400Type.DTAQ == configuration.getType()) { - Consumer consumer = new Jt400DataQueueConsumer(this); + Consumer consumer = new Jt400DataQueueConsumer(this, processor); configureConsumer(consumer); return consumer; } else { @@ -260,4 +252,12 @@ public class Jt400Endpoint extends DefaultEndpoint { return configuration.isSecured(); } + public int getReadTimeout() { + return configuration.getReadTimeout(); + } + + public void setReadTimeout(int readTimeout) { + configuration.setReadTimeout(readTimeout); + } + }