CAMEL-10271: Fixed camel-jt400 consumer to be scheduled so it reads from the 
queue as the jt400 library is not event based.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/77e417c2
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/77e417c2
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/77e417c2

Branch: refs/heads/camel-2.17.x
Commit: 77e417c2775b15f24cc6f47df7fbdfe7c4a436af
Parents: 1f7abfc
Author: Claus Ibsen <davscl...@apache.org>
Authored: Mon Sep 5 18:22:28 2016 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Mon Sep 5 18:23:18 2016 +0200

----------------------------------------------------------------------
 .../component/jt400/Jt400Configuration.java     | 14 +++++++
 .../component/jt400/Jt400DataQueueConsumer.java | 42 ++++++++++++++------
 .../camel/component/jt400/Jt400Endpoint.java    | 22 +++++-----
 3 files changed, 54 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/77e417c2/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Configuration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Configuration.java
 
b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Configuration.java
index b33a25d..9a2383f 100644
--- 
a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Configuration.java
+++ 
b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Configuration.java
@@ -113,6 +113,9 @@ public class Jt400Configuration {
     @UriParam
     private Integer[] outputFieldsLengthArray;
 
+    @UriParam(label = "consumer", defaultValue = "30000")
+    private int readTimeout = 30000;
+
     public Jt400Configuration(String endpointUri, AS400ConnectionPool 
connectionPool) throws URISyntaxException {
         ObjectHelper.notNull(endpointUri, "endpointUri", this);
         ObjectHelper.notNull(connectionPool, "connectionPool", this);
@@ -301,6 +304,17 @@ public class Jt400Configuration {
         this.outputFieldsLengthArray = outputFieldsLengthArray;
     }
 
+    public int getReadTimeout() {
+        return readTimeout;
+    }
+
+    /**
+     * Timeout in millis the consumer will wait while trying to read a new 
message of the data queue.
+     */
+    public void setReadTimeout(int readTimeout) {
+        this.readTimeout = readTimeout;
+    }
+
     public void setOutputFieldsIdx(String outputFieldsIdx) {
         if (outputFieldsIdx != null) {
             String[] outputArray = outputFieldsIdx.split(",");

http://git-wip-us.apache.org/repos/asf/camel/blob/77e417c2/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java
 
b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java
index c3ff46e..6bf9788 100644
--- 
a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java
+++ 
b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400DataQueueConsumer.java
@@ -22,16 +22,15 @@ import com.ibm.as400.access.DataQueueEntry;
 import com.ibm.as400.access.KeyedDataQueue;
 import com.ibm.as400.access.KeyedDataQueueEntry;
 import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
 import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.impl.PollingConsumerSupport;
+import org.apache.camel.impl.ScheduledPollConsumer;
 
 /**
- * {@link org.apache.camel.PollingConsumer} that polls a data queue for data
+ * A scheduled {@link org.apache.camel.Consumer} that polls a data queue for 
data
  */
-public class Jt400DataQueueConsumer extends PollingConsumerSupport {
+public class Jt400DataQueueConsumer extends ScheduledPollConsumer {
 
-    private final Jt400Endpoint endpoint;
-    
     /**
      * Performs the lifecycle logic of this consumer.
      */
@@ -40,13 +39,28 @@ public class Jt400DataQueueConsumer extends 
PollingConsumerSupport {
     /**
      * Creates a new consumer instance
      */
-    protected Jt400DataQueueConsumer(Jt400Endpoint endpoint) {
-        super(endpoint);
-        this.endpoint = endpoint;
+    public Jt400DataQueueConsumer(Jt400Endpoint endpoint, Processor processor) 
{
+        super(endpoint, processor);
         this.queueService = new Jt400DataQueueService(endpoint);
     }
 
     @Override
+    public Jt400Endpoint getEndpoint() {
+        return (Jt400Endpoint) super.getEndpoint();
+    }
+
+    @Override
+    protected int poll() throws Exception {
+        Exchange exchange = receive(getEndpoint().getReadTimeout());
+        if (exchange != null) {
+            getProcessor().process(exchange);
+            return 1;
+        } else {
+            return 0;
+        }
+    }
+
+    @Override
     protected void doStart() throws Exception {
         queueService.start();
     }
@@ -56,11 +70,13 @@ public class Jt400DataQueueConsumer extends 
PollingConsumerSupport {
         queueService.stop();
     }
 
+    @Deprecated
     public Exchange receive() {
         // -1 to indicate a blocking read from data queue
         return receive(-1);
     }
 
+    @Deprecated
     public Exchange receiveNoWait() {
         return receive(0);
     }
@@ -85,7 +101,7 @@ public class Jt400DataQueueConsumer extends 
PollingConsumerSupport {
     public Exchange receive(long timeout) {
         BaseDataQueue queue = queueService.getDataQueue();
         try {
-            if (endpoint.isKeyed()) {
+            if (getEndpoint().isKeyed()) {
                 return receive((KeyedDataQueue) queue, timeout);
             } else {
                 return receive((DataQueue) queue, timeout);
@@ -109,7 +125,7 @@ public class Jt400DataQueueConsumer extends 
PollingConsumerSupport {
         Exchange exchange = getEndpoint().createExchange();
         if (entry != null) {
             exchange.getIn().setHeader(Jt400Endpoint.SENDER_INFORMATION, 
entry.getSenderInformation());
-            if (endpoint.getFormat() == Jt400Configuration.Format.binary) {
+            if (getEndpoint().getFormat() == Jt400Configuration.Format.binary) 
{
                 exchange.getIn().setBody(entry.getData());
             } else {
                 exchange.getIn().setBody(entry.getString());
@@ -120,8 +136,8 @@ public class Jt400DataQueueConsumer extends 
PollingConsumerSupport {
     }
 
     private Exchange receive(KeyedDataQueue queue, long timeout) throws 
Exception {
-        String key = endpoint.getSearchKey();
-        String searchType = endpoint.getSearchType().name();
+        String key = getEndpoint().getSearchKey();
+        String searchType = getEndpoint().getSearchType().name();
         KeyedDataQueueEntry entry;
         if (timeout >= 0) {
             int seconds = (int) timeout / 1000;
@@ -135,7 +151,7 @@ public class Jt400DataQueueConsumer extends 
PollingConsumerSupport {
         Exchange exchange = getEndpoint().createExchange();
         if (entry != null) {
             exchange.getIn().setHeader(Jt400Endpoint.SENDER_INFORMATION, 
entry.getSenderInformation());
-            if (endpoint.getFormat() == Jt400Configuration.Format.binary) {
+            if (getEndpoint().getFormat() == Jt400Configuration.Format.binary) 
{
                 exchange.getIn().setBody(entry.getData());
                 exchange.getIn().setHeader(Jt400Endpoint.KEY, entry.getKey());
             } else {

http://git-wip-us.apache.org/repos/asf/camel/blob/77e417c2/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Endpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Endpoint.java
 
b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Endpoint.java
index 849e683..e3ad785 100644
--- 
a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Endpoint.java
+++ 
b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Endpoint.java
@@ -24,10 +24,9 @@ import com.ibm.as400.access.AS400;
 import com.ibm.as400.access.AS400ConnectionPool;
 import org.apache.camel.CamelException;
 import org.apache.camel.Consumer;
-import org.apache.camel.PollingConsumer;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
-import org.apache.camel.impl.DefaultEndpoint;
+import org.apache.camel.impl.ScheduledPollEndpoint;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.util.ObjectHelper;
@@ -37,7 +36,7 @@ import org.apache.camel.util.URISupport;
  * The jt400 component allows you to exchanges messages with an AS/400 system 
using data queues or program call.
  */
 @UriEndpoint(scheme = "jt400", title = "JT400", syntax = 
"jt400:userID:password/systemName/objectPath.type", consumerClass = 
Jt400DataQueueConsumer.class, label = "messaging")
-public class Jt400Endpoint extends DefaultEndpoint {
+public class Jt400Endpoint extends ScheduledPollEndpoint {
 
     public static final String KEY = "KEY";
     public static final String SENDER_INFORMATION = "SENDER_INFORMATION";
@@ -70,13 +69,6 @@ public class Jt400Endpoint extends DefaultEndpoint {
     }
 
     @Override
-    public PollingConsumer createPollingConsumer() throws Exception {
-        Jt400DataQueueConsumer answer = new Jt400DataQueueConsumer(this);
-        configurePollingConsumer(answer);
-        return answer;
-    }
-
-    @Override
     public Producer createProducer() throws Exception {
         if (Jt400Type.DTAQ == configuration.getType()) {
             return new Jt400DataQueueProducer(this);
@@ -88,7 +80,7 @@ public class Jt400Endpoint extends DefaultEndpoint {
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
         if (Jt400Type.DTAQ == configuration.getType()) {
-            Consumer consumer = new Jt400DataQueueConsumer(this);
+            Consumer consumer = new Jt400DataQueueConsumer(this, processor);
             configureConsumer(consumer);
             return consumer;
         } else {
@@ -260,4 +252,12 @@ public class Jt400Endpoint extends DefaultEndpoint {
         return configuration.isSecured();
     }
 
+    public int getReadTimeout() {
+        return configuration.getReadTimeout();
+    }
+
+    public void setReadTimeout(int readTimeout) {
+        configuration.setReadTimeout(readTimeout);
+    }
+
 }

Reply via email to