This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new d34f104  [CAMEL-15170] initial commit (add message queue support to 
JT400 component) (#3917)
d34f104 is described below

commit d34f104504a78a95d0c82f9b33db8df5faf504fa
Author: Jesse Gorzinski <[email protected]>
AuthorDate: Fri Jun 19 02:47:18 2020 -0500

    [CAMEL-15170] initial commit (add message queue support to JT400 component) 
(#3917)
    
    * CAMEL-15170: initial commit (add message queue support)
    
    * style changes for -Psourcecheck compliance
    
    * remove deprecated receive functions
    
    * Set headers for message file and id
    
    * fix order of 3-part job name
    
    * add support for different message actions
    
    * improve conciseness of header-setting
    
    * set header for message key
    
    * bugfix: only use message key when action is SAME
    
    * add message type header
    
    * add documentation for jt400 msgq support
    
    * remove "type" query parameter as it is not needed
    
    * add header info to doc, plus minor cleanup
    
    * rename header constants (jt400 scope) and stop setting MESSAGE_KEY header
    
    * cleanup comments/doc
    
    * updated doc for messageAction parameter
    
    * improve doc surrounding message queue processing
    
    * cleanup misc javadoc errors
    
    * move header constants into new class
    
    * change header constant values
    
    Co-authored-by: Jesse Gorzinski <[email protected]>
---
 .../camel-jt400/src/main/docs/jt400-component.adoc |  74 ++++++++++-
 .../camel/component/jt400/Jt400Configuration.java  |  47 ++++++-
 .../jt400/{Jt400Type.java => Jt400Constants.java}  |  13 +-
 .../camel/component/jt400/Jt400Endpoint.java       |  20 ++-
 .../component/jt400/Jt400MsgQueueConsumer.java     | 139 +++++++++++++++++++++
 .../component/jt400/Jt400MsgQueueProducer.java     |  57 +++++++++
 .../component/jt400/Jt400MsgQueueService.java      | 102 +++++++++++++++
 .../apache/camel/component/jt400/Jt400Type.java    |   2 +-
 8 files changed, 442 insertions(+), 12 deletions(-)

diff --git a/components/camel-jt400/src/main/docs/jt400-component.adoc 
b/components/camel-jt400/src/main/docs/jt400-component.adoc
index f1de98b..ae2c0c5 100644
--- a/components/camel-jt400/src/main/docs/jt400-component.adoc
+++ b/components/camel-jt400/src/main/docs/jt400-component.adoc
@@ -2,7 +2,7 @@
 = JT400 Component
 :docTitle: JT400
 :artifactId: camel-jt400
-:description: Exchanges messages with an AS/400 system using data queues or 
program call.
+:description: Exchanges messages with an AS/400 system using data queues, 
message queues, or program call.
 :since: 1.5
 :supportLevel: Stable
 :component-header: Both producer and consumer are supported
@@ -29,11 +29,20 @@ for this component:
 
 == URI format
 
+To send or receive data from a data queue
+
 [source,java]
 ----------------------------------------------------------------------
 jt400://user:password@system/QSYS.LIB/LIBRARY.LIB/QUEUE.DTAQ[?options]
 ----------------------------------------------------------------------
 
+To send or receive messages from a message queue
+
+[source,java]
+----------------------------------------------------------------------
+jt400://user:password@system/QSYS.LIB/LIBRARY.LIB/QUEUE.MSGQ[?options]
+----------------------------------------------------------------------
+
 To call remote program
 
 [source,java]
@@ -71,7 +80,7 @@ jt400:userID:password/systemName/objectPath.type
 
 with the following path and query parameters:
 
-=== Path Parameters (5 parameters):
+=== Path Parameters (4 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -81,11 +90,10 @@ with the following path and query parameters:
 | *password* | *Required* Returns the password of the AS/400 user. |  | String
 | *systemName* | *Required* Returns the name of the AS/400 system. |  | String
 | *objectPath* | *Required* Returns the fully qualified integrated file system 
path name of the target object of this endpoint. |  | String
-| *type* | *Required* Whether to work with data queues or remote program call. 
The value can be one of: DTAQ, PGM, SRVPGM |  | Jt400Type
 |===
 
 
-=== Query Parameters (33 parameters):
+=== Query Parameters (34 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -97,6 +105,7 @@ with the following path and query parameters:
 | *keyed* (common) | Whether to use keyed or non-keyed data queues. | false | 
boolean
 | *searchKey* (common) | Search key for keyed data queues. |  | String
 | *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the 
Camel routing Error Handler, which mean any exceptions occurred while the 
consumer is trying to pickup incoming messages, or the likes, will now be 
processed as a message and handled by the routing Error Handler. By default the 
consumer will use the org.apache.camel.spi.ExceptionHandler to deal with 
exceptions, that will be logged at WARN or ERROR level and ignored. | false | 
boolean
+| *messageAction* (consumer) | Action to be taken on messages when read from a 
message queue. Messages can be marked as old (OLD), removed from the queue 
(REMOVE), or neither (SAME). The value can be one of: OLD, REMOVE, SAME | OLD | 
MessageAction
 | *readTimeout* (consumer) | Timeout in millis the consumer will wait while 
trying to read a new message of the data queue. | 30000 | int
 | *searchType* (consumer) | Search type such as EQ for equal etc. The value 
can be one of: EQ, NE, LT, LE, GT, GE | EQ | SearchType
 | *sendEmptyMessageWhenIdle* (consumer) | If the polling consumer did not poll 
any files, you can enable this option to send an empty message (no body) 
instead. | false | boolean
@@ -131,13 +140,27 @@ with the following path and query parameters:
 
 == Usage
 
-When configured as a consumer endpoint, the endpoint will poll a data
+When configured as a data queue consumer endpoint, the endpoint will poll a 
data
 queue on a remote system. For every entry on the data queue, a new
 `Exchange` is sent with the entry's data in the _In_ message's body,
 formatted either as a `String` or a `byte[]`, depending on the format.
 For a provider endpoint, the _In_ message body contents will be put on
 the data queue as either raw bytes or text.
 
+When configured as a message queue consumer endpoint, the endpoint will poll
+a message queue on a remote system. For every entry on the queue, a new
+`Exchange` is sent with the entry's data in the _In_ message's body. The
+data is always formatted as a `String`. Note that only new messages will
+be processed. That is, any existing messages on the queue that have already
+been handled by another program will not be processed by this endpoint.
+
+For a data queue provider endpoint, the _In_ message body contents will be
+put on the data queue as either raw bytes or text.
+
+For a message queue provider endpoint, the _In_ message body contents are
+presumed to be text and sent to the queue as an informational message.
+Inquiry messages or messages requiring a message ID are not supported.
+
 == Connection pool
 
 *Since Camel 2.10*
@@ -156,6 +179,31 @@ String array or byte[] array with the values as they were 
returned by
 the program (the input only parameters will contain the same data as the
 beginning of the invocation). This endpoint does not implement a provider 
endpoint!
 
+== Message headers
+
+=== Consumer headers when consuming from data queues
+
+The following headers are potentially available. If the values could not
+be determined, the headers will not be set
+[width="100%",cols="2m,2m,1m,5",options="header"]
+|===
+| Header constant                          | Header value                      
| Type    | Description
+| Jt400Endpoint.SENDER_INFORMATION         | "SENDER_INFORMATION"              
| String  | Returns the sender information for this data queue entry, or an 
empty string if not available
+|===
+
+=== Consumer headers when consuming from message queues
+
+The following headers are potentially available. If the values could not
+be determined, the headers will not be set
+[width="100%",cols="2m,2m,1m,5",options="header"]
+|===
+| Header constant                          | Header value                      
| Type    | Description
+| Jt400Endpoint.MESSAGE_ID                 | "CamelJt400MessageID"             
| String  | The message identifier
+| Jt400Endpoint.MESSAGE_FILE               | "CamelJt400MessageFile"           
| String  | The message file name
+| Jt400Endpoint.MESSAGE_TYPE               | "CamelJt400MessageType"           
| Integer | The message type (corresponds to constants defined in the 
AS400Message class)
+| Jt400Endpoint.SENDER_INFORMATION         | "SENDER_INFORMATION"              
| String  | The job identifier of the sending job
+|===
+
 == Example
 
 In the snippet below, the data for an exchange sent to the
@@ -209,5 +257,21 @@ 
from("jt400://username:password@system/lib.lib/MSGOUTDQ.DTAQ?keyed=true&searchKe
 .to("jms:queue:output");
 
-------------------------------------------------------------------------------------------------------
 
+=== Writing to message queues
+
+[source,java]
+------------------------------------------------------------------------
+from("jms:queue:input")
+.to("jt400://username:password@system/lib.lib/MSGINQ.MSGQ");
+------------------------------------------------------------------------
+
+=== Reading from a message queue
+
+[source,java]
+-------------------------------------------------------------------------------------------------------
+from("jt400://username:password@system/lib.lib/MSGOUTQ.DTAQ")
+.to("jms:queue:output");
+-------------------------------------------------------------------------------------------------------
+
 
 include::camel-spring-boot::page$jt400-starter.adoc[]
diff --git 
a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Configuration.java
 
b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Configuration.java
index 5630203..0bbaafd 100644
--- 
a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Configuration.java
+++ 
b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Configuration.java
@@ -23,6 +23,7 @@ import java.net.URISyntaxException;
 import com.ibm.as400.access.AS400;
 import com.ibm.as400.access.AS400ConnectionPool;
 import com.ibm.as400.access.ConnectionPoolException;
+import com.ibm.as400.access.MessageQueue;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriParam;
@@ -58,6 +59,35 @@ public class Jt400Configuration {
         binary
     }
 
+    public enum MessageAction {
+        /**
+         * Keep the message in the message queue and mark it as an old message
+         */
+        OLD(MessageQueue.OLD),
+        /**
+         * Remove the message from the message queue
+         */
+        REMOVE(MessageQueue.REMOVE),
+        /**
+         * Keep the message in the message queue without changing its new or 
old designation
+         */
+        SAME(MessageQueue.SAME);
+
+        private String jt400Value;
+        private MessageAction(final String jt400Value) {
+            this.jt400Value = jt400Value;
+        }
+        /**
+         * Returns the string literal value that can be used for
+         * APIs from the JTOpen (jt400) libraries
+         *
+         * @return a value suitable for use with jt400 libraries
+         */
+        public String getJt400Value() {
+            return jt400Value;
+        }
+    }
+
     /**
      * Logging tool.
      */
@@ -86,7 +116,6 @@ public class Jt400Configuration {
     @UriPath @Metadata(required = true)
     private String objectPath;
 
-    @UriPath @Metadata(required = true)
     private Jt400Type type;
 
     @UriParam
@@ -119,6 +148,9 @@ public class Jt400Configuration {
     @UriParam(label = "producer")
     private String procedureName;
 
+    @UriParam(label = "consumer", defaultValue = "OLD")
+    private MessageAction messageAction = MessageAction.OLD;
+
     public Jt400Configuration(String endpointUri, AS400ConnectionPool 
connectionPool) throws URISyntaxException {
         ObjectHelper.notNull(endpointUri, "endpointUri", this);
         ObjectHelper.notNull(connectionPool, "connectionPool", this);
@@ -329,6 +361,19 @@ public class Jt400Configuration {
         this.procedureName = procedureName;
     }
 
+    public MessageAction getMessageAction() {
+        return messageAction;
+    }
+
+    /**
+     * Action to be taken on messages when read from a message queue.
+     * Messages can be marked as old ("OLD"), removed from the queue
+     * ("REMOVE"), or neither ("SAME").
+     */
+    public void setMessageAction(MessageAction messageAction) {
+        this.messageAction = messageAction;
+    }
+
     public void setOutputFieldsIdx(String outputFieldsIdx) {
         if (outputFieldsIdx != null) {
             String[] outputArray = outputFieldsIdx.split(",");
diff --git 
a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Type.java
 
b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Constants.java
old mode 100644
new mode 100755
similarity index 64%
copy from 
components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Type.java
copy to 
components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Constants.java
index 257b462..f420155
--- 
a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Type.java
+++ 
b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Constants.java
@@ -16,7 +16,16 @@
  */
 package org.apache.camel.component.jt400;
 
-public enum Jt400Type {
+public interface Jt400Constants {
 
-    DTAQ, PGM, SRVPGM
+    //header names
+    public static final String SENDER_INFORMATION = "SENDER_INFORMATION";
+
+    // Used only for keyed data queue support
+    public static final String KEY = "KEY";
+
+    // Used only for message queue support
+    public static final String MESSAGE_ID = "CamelJt400MessageID";
+    public static final String MESSAGE_FILE = "CamelJt400MessageFile";
+    public static final String MESSAGE_TYPE = "CamelJt400MessageType";
 }
diff --git 
a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Endpoint.java
 
b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Endpoint.java
index 276cde1..9fe122e 100644
--- 
a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Endpoint.java
+++ 
b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Endpoint.java
@@ -36,13 +36,13 @@ import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.URISupport;
 
 /**
- * Exchanges messages with an AS/400 system using data queues or program call.
+ * Exchanges messages with an AS/400 system using data queues, message queues, 
or program call.
  */
 @UriEndpoint(firstVersion = "1.5.0", scheme = "jt400", title = "JT400", syntax 
= "jt400:userID:password/systemName/objectPath.type", category = 
{Category.MESSAGING})
 public class Jt400Endpoint extends ScheduledPollEndpoint implements 
MultipleConsumersSupport {
 
-    public static final String KEY = "KEY";
-    public static final String SENDER_INFORMATION = "SENDER_INFORMATION";
+    public static final String KEY = Jt400Constants.KEY;
+    public static final String SENDER_INFORMATION = 
Jt400Constants.SENDER_INFORMATION;
 
     @UriParam
     private final Jt400Configuration configuration;
@@ -79,6 +79,8 @@ public class Jt400Endpoint extends ScheduledPollEndpoint 
implements MultipleCons
     public Producer createProducer() throws Exception {
         if (Jt400Type.DTAQ == configuration.getType()) {
             return new Jt400DataQueueProducer(this);
+        } else if (Jt400Type.MSGQ == configuration.getType()) {
+            return new Jt400MsgQueueProducer(this);
         } else {
             return new Jt400PgmProducer(this);
         }
@@ -90,6 +92,10 @@ public class Jt400Endpoint extends ScheduledPollEndpoint 
implements MultipleCons
             Consumer consumer = new Jt400DataQueueConsumer(this, processor);
             configureConsumer(consumer);
             return consumer;
+        } else if (Jt400Type.MSGQ == configuration.getType()) {
+            Consumer consumer = new Jt400MsgQueueConsumer(this, processor);
+            configureConsumer(consumer);
+            return consumer;
         } else {
             throw new OperationNotSupportedException();
         }
@@ -270,6 +276,14 @@ public class Jt400Endpoint extends ScheduledPollEndpoint 
implements MultipleCons
         return configuration.getProcedureName();
     }
 
+    public void setMessageAction(Jt400Configuration.MessageAction 
messageAction) {
+        configuration.setMessageAction(messageAction);
+    }
+
+    public Jt400Configuration.MessageAction getMessageAction() {
+        return configuration.getMessageAction();
+    }
+
     @Override
     public boolean isMultipleConsumersSupported() {
         return true;
diff --git 
a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400MsgQueueConsumer.java
 
b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400MsgQueueConsumer.java
new file mode 100755
index 0000000..30878dd
--- /dev/null
+++ 
b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400MsgQueueConsumer.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jt400;
+
+import com.ibm.as400.access.MessageQueue;
+import com.ibm.as400.access.QueuedMessage;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.support.ScheduledPollConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A scheduled {@link org.apache.camel.Consumer} that polls a message queue 
for new messages
+ */
+public class Jt400MsgQueueConsumer extends ScheduledPollConsumer {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(Jt400MsgQueueConsumer.class);
+
+    /**
+     * Performs the lifecycle logic of this consumer.
+     */
+    private final Jt400MsgQueueService queueService;
+
+    private byte[] messageKey;
+
+    /**
+     * Creates a new consumer instance
+     */
+    public Jt400MsgQueueConsumer(Jt400Endpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+        this.queueService = new Jt400MsgQueueService(endpoint);
+        this.messageKey = null;
+    }
+
+    @Override
+    public Jt400Endpoint getEndpoint() {
+        return (Jt400Endpoint) super.getEndpoint();
+    }
+
+    @Override
+    protected int poll() throws Exception {
+        Exchange exchange = receive(getEndpoint().getReadTimeout());
+        if (exchange != null) {
+            getProcessor().process(exchange);
+            return 1;
+        } else {
+            return 0;
+        }
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        queueService.start();
+        super.doStart();
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        super.doStop();
+        queueService.stop();
+    }
+
+    /**
+     * Receives an entry from a message queue and returns an {@link Exchange} 
to
+     * send this data, which will be received/sent as
+     * a <code>String</code>.
+     * <p/>
+     * The following message headers may be set by the receiver
+     * <ul>
+     * <li>SENDER_INFORMATION: The Sender Information from the message</li>
+     * <li>jt400.MESSAGE_ID: The message identifier</li>
+     * <li>jt400.MESSAGE_FILE: The message file</li>
+     * <li>jt400.MESSAGE_TYPE: The message type (corresponds to constants 
defined in the AS400Message class)</li>
+     * </ul>
+     *
+     * @param timeout time to wait when reading from message queue. A value of 
-1
+     *                indicates an infinite wait time.
+     */
+    public Exchange receive(long timeout) {
+        MessageQueue queue = queueService.getMsgQueue();
+        try {
+            return receive(queue, timeout);
+        } catch (Exception e) {
+            throw new RuntimeCamelException("Unable to read from message 
queue: " + queue.getPath(), e);
+        }
+    }
+
+    private synchronized Exchange receive(MessageQueue queue, long timeout) 
throws Exception {
+        QueuedMessage entry;
+        int seconds = (timeout >= 0) ? (int) timeout / 1000 : -1;
+        LOG.trace("Reading from message queue: {} with {} seconds timeout", 
queue.getPath(), -1 == seconds ? "infinite" : seconds);
+
+        Jt400Configuration.MessageAction messageAction = 
getEndpoint().getMessageAction();
+        entry = queue.receive(messageKey, //message key
+                              seconds,    //timeout
+                              messageAction.getJt400Value(),  // message action
+                              null == messageKey ? MessageQueue.ANY : 
MessageQueue.NEXT); // types of messages
+
+        if (null == entry) {
+            return null;
+        }
+        // Need to tuck away the message key if the message action is SAME, 
otherwise
+        // we'll just keep retrieving the same message over and over
+        if (Jt400Configuration.MessageAction.SAME == messageAction) {
+            this.messageKey = entry.getKey();
+        }
+
+        Exchange exchange = getEndpoint().createExchange();
+        exchange.getIn().setHeader(Jt400Constants.SENDER_INFORMATION, 
entry.getFromJobNumber() + "/" + entry.getUser() + "/" + 
entry.getFromJobName());
+        setHeaderIfValueNotNull(exchange.getIn(), Jt400Constants.MESSAGE_ID, 
entry.getID());
+        setHeaderIfValueNotNull(exchange.getIn(), Jt400Constants.MESSAGE_FILE, 
entry.getFileName());
+        setHeaderIfValueNotNull(exchange.getIn(), Jt400Constants.MESSAGE_TYPE, 
entry.getType());
+        exchange.getIn().setBody(entry.getText());
+        return exchange;
+    }
+    private static void setHeaderIfValueNotNull(final Message message, final 
String header, final Object value) {
+        if (null == value) {
+            return;
+        }
+        message.setHeader(header, value);
+    }
+}
diff --git 
a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400MsgQueueProducer.java
 
b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400MsgQueueProducer.java
new file mode 100755
index 0000000..98dcda3
--- /dev/null
+++ 
b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400MsgQueueProducer.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jt400;
+
+import com.ibm.as400.access.MessageQueue;
+import org.apache.camel.Exchange;
+import org.apache.camel.Producer;
+import org.apache.camel.support.DefaultProducer;
+
+/**
+ * {@link Producer} to send data to an IBM i message queue.
+ */
+public class Jt400MsgQueueProducer extends DefaultProducer {
+
+    private final Jt400Endpoint endpoint;
+    
+    /**
+     * Performs the lifecycle logic of this producer.
+     */
+    protected Jt400MsgQueueProducer(Jt400Endpoint endpoint) {
+        super(endpoint);
+        this.endpoint = endpoint;
+    }
+
+    /**
+     * Sends the {@link Exchange}'s in body to the message queue as an 
informational message.
+     * Data will be sent as a
+     * <code>String</code>.
+     */
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        try (Jt400MsgQueueService queueService = new 
Jt400MsgQueueService(endpoint)) {
+            queueService.start();
+            process(queueService.getMsgQueue(), exchange);
+        }
+    }
+
+    private void process(MessageQueue queue, Exchange exchange) throws 
Exception {
+        String msgText = exchange.getIn().getBody(String.class);
+        queue.sendInformational(msgText);
+    }
+
+}
diff --git 
a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400MsgQueueService.java
 
b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400MsgQueueService.java
new file mode 100755
index 0000000..995d7b6
--- /dev/null
+++ 
b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400MsgQueueService.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.jt400;
+
+import java.io.IOException;
+
+import com.ibm.as400.access.AS400;
+import com.ibm.as400.access.MessageQueue;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.Service;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Pseudo-abstract class that encapsulates Service logic common to
+ * {@link Jt400MsgQueueConsumer} and {@link Jt400MsgQueueProducer}.
+ */
+class Jt400MsgQueueService implements Service {
+    
+    /**
+     * Logging tool.
+     */
+    private static final Logger LOG = 
LoggerFactory.getLogger(Jt400MsgQueueService.class);
+    
+    /**
+     * Endpoint which this service connects to.
+     */
+    private final Jt400Endpoint endpoint;
+    
+    /**
+     * Message queue object that corresponds to the endpoint of this service 
(null if stopped).
+     */
+    private MessageQueue queue;
+    
+    /**
+     * Creates a {@code Jt400MsgQueueService} that connects to the specified
+     * endpoint.
+     * 
+     * @param endpoint endpoint which this service connects to
+     */
+    Jt400MsgQueueService(Jt400Endpoint endpoint) {
+        ObjectHelper.notNull(endpoint, "endpoint", this);
+        this.endpoint = endpoint;
+    }
+
+    @Override
+    public void start() {
+        if (queue == null) {
+            AS400 system = endpoint.getSystem();
+            queue = new MessageQueue(system, endpoint.getObjectPath());
+        }
+        if (!queue.getSystem().isConnected(AS400.COMMAND)) {
+            LOG.debug("Connecting to {}", endpoint);
+            try {
+                queue.getSystem().connectService(AS400.COMMAND);
+            } catch (Exception e) {
+                throw RuntimeCamelException.wrapRuntimeCamelException(e);
+            }
+        }
+    }
+
+    @Override
+    public void stop() {
+        if (queue != null) {
+            LOG.debug("Releasing connection to {}", endpoint);
+            AS400 system = queue.getSystem();
+            queue = null;
+            endpoint.releaseSystem(system);
+        }
+    }
+    
+    /**
+     * Returns the message queue object that this service connects to. Returns
+     * {@code null} if the service is stopped.
+     * 
+     * @return the message queue object that this service connects to, or
+     *         {@code null} if stopped
+     */
+    public MessageQueue getMsgQueue() {
+        return queue;
+    }
+
+    @Override
+    public void close() throws IOException {
+        stop();
+    }
+}
diff --git 
a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Type.java
 
b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Type.java
index 257b462..67afa64 100644
--- 
a/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Type.java
+++ 
b/components/camel-jt400/src/main/java/org/apache/camel/component/jt400/Jt400Type.java
@@ -18,5 +18,5 @@ package org.apache.camel.component.jt400;
 
 public enum Jt400Type {
 
-    DTAQ, PGM, SRVPGM
+    DTAQ, PGM, SRVPGM, MSGQ
 }

Reply via email to