Cheers. I'm working with Bob on this issue, and we've found something when
keeping open connections, sessions, consumers in our receiver. Below is the
code for our sender and our receiver. The receiver continues to check the JMS
queue when the queue is stopped and started, and also when the JBoss server is
shutdown and restarted. On a queue stop/start, the receiver continues to act as
if it's actually trying to get messages. It does not error. When the queue
comes back up, the receiver will receive any new messages that arrive after the
restart. On a JBoss restart, the receiver does pretty much the same, with no
notification of the loss of connectivity, and upon restart, the receiver does
NOT receive any new messages that arrive after the JBoss restart. Any ideas on
how we can make the receiver realize the server is down so it can handle
reconnecting?
Sender code:
| package com.valpak.renderingservice.indesign.unittests;
|
| import java.util.Properties;
|
| import javax.jms.DeliveryMode;
| import javax.jms.JMSException;
| import javax.jms.ObjectMessage;
| import javax.jms.Queue;
| import javax.jms.QueueConnection;
| import javax.jms.QueueConnectionFactory;
| import javax.jms.QueueReceiver;
| import javax.jms.QueueSender;
| import javax.jms.QueueSession;
| import javax.naming.Context;
| import javax.naming.InitialContext;
| import javax.naming.NamingException;
|
| import org.apache.log4j.Logger;
|
| public class JMSSender {
| private static Logger log = Logger.getLogger(JMSReceiver.class);
| private String jmsServer = <jms queue server URL;
| private String jmsQueue = <queue name>;
|
| public static void main(String[] args) {
| JMSSender jmss = new JMSSender();
| try {
| while (true) {
| log.debug("Sending a message");
| jmss.sendMessage();
| Thread.sleep(10000);
| }
| } catch (InterruptedException e) {
| e.printStackTrace();
| }
|
| }
|
| private void sendMessage() {
| InitialContext ctx = null;
| QueueConnection conn = null;
| QueueConnectionFactory tcf = null;
| Queue queue = null;
| QueueSession session = null;
| QueueSender sender = null;
| QueueReceiver receiver = null;
| Properties properties = new Properties();
| int replyWaitMS = 3000;
| ObjectMessage message = null;
|
| try {
| properties.put(Context.INITIAL_CONTEXT_FACTORY,
"org.jnp.interfaces.NamingContextFactory");
| properties.put(Context.URL_PKG_PREFIXES,
"org.jnp.interfaces");
| properties.put(Context.PROVIDER_URL, jmsServer);
| ctx = new InitialContext(properties);
| tcf = (QueueConnectionFactory)
ctx.lookup("ConnectionFactory");
| conn = tcf.createQueueConnection();
| queue = (Queue) ctx.lookup(jmsQueue);
| conn.start();
| session = conn.createQueueSession(false,
QueueSession.AUTO_ACKNOWLEDGE);
| sender = session.createSender(queue);
| sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
| String messageBody = "You're an idiot!";
| message = session.createObjectMessage(messageBody);
| Queue tempQueue = session.createTemporaryQueue();
| if (tempQueue != null) {
| message.setJMSReplyTo(tempQueue);
| sender.setTimeToLive(replyWaitMS * 20);
| } else {
| sender.setTimeToLive(60 * 60 * 1000); // 60
minutes
| }
| sender.send(message);
|
| if (tempQueue != null) {
| receiver = session.createReceiver(tempQueue);
| Object responseMessage =
receiver.receive(replyWaitMS);
| if (responseMessage != null) {
| ObjectMessage response =
(ObjectMessage) responseMessage;
| Object responseObject =
response.getObject();
| if (responseObject instanceof String) {
| log.debug(responseObject);
| }
| }
| }
| } catch (NamingException e) {
| e.printStackTrace();
| } catch (JMSException e) {
| e.printStackTrace();
| } finally {
| close(conn, session, receiver, sender);
| }
| }
|
| private void close(QueueConnection conn, QueueSession session,
QueueReceiver receiver, QueueSender sender) {
| try {
| if (conn != null) {
| conn.close();
| }
| } catch (JMSException e) {
| e.printStackTrace();
| }
| try {
| if (session != null) {
| session.close();
| }
| } catch (JMSException e) {
| e.printStackTrace();
| }
| try {
| if (receiver != null) {
| receiver.close();
| }
| } catch (JMSException e) {
| e.printStackTrace();
| }
| try {
| if (sender != null) {
| sender.close();
| }
| } catch (JMSException e) {
| e.printStackTrace();
| }
| }
| }
|
And the receiver code:
| package com.valpak.renderingservice.indesign.unittests;
|
| import java.io.Serializable;
| import java.util.Properties;
|
| import javax.jms.DeliveryMode;
| import javax.jms.Destination;
| import javax.jms.JMSException;
| import javax.jms.MessageProducer;
| import javax.jms.ObjectMessage;
| import javax.jms.Queue;
| import javax.jms.QueueConnection;
| import javax.jms.QueueConnectionFactory;
| import javax.jms.QueueReceiver;
| import javax.jms.QueueSession;
| import javax.naming.Context;
| import javax.naming.InitialContext;
| import javax.naming.NamingException;
|
| import org.apache.log4j.Logger;
|
| public class JMSReceiver {
| private static Logger log = Logger.getLogger(JMSReceiver.class);
| private String jmsServer = <jms server URL>;
| private String jmsQueue = <queue name>;
| private int queueWaitSeconds = 3;
| InitialContext ctx = null;
| QueueConnection conn = null;
| QueueConnectionFactory tcf = null;
| Queue queue = null;
| QueueSession session = null;
| QueueReceiver consumer = null;
|
| public static void main(String[] args) {
| // Check for messages
| JMSReceiver jmsr = new JMSReceiver();
| try {
| jmsr.initialize();
| while (true) {
| ObjectMessage message = jmsr.checkQueue();
| if (message != null) {
| if (message.getObject() instanceof
String) {
| // Output what we got
| log.debug(message.getObject());
| // Send a reply
| String reply = "I know you are,
but what am I?";
|
jmsr.sendReplyMessage(message.getJMSReplyTo(), reply);
| }
| } else {
| log.debug("No message. Sleeping");
| Thread.sleep(6000);
| }
| }
| } catch (Exception e) {
| e.printStackTrace();
| }
|
| }
|
| private void initialize() throws Exception {
| Properties properties = new Properties();
| properties.put(Context.INITIAL_CONTEXT_FACTORY,
"org.jnp.interfaces.NamingContextFactory");
| properties.put(Context.URL_PKG_PREFIXES, "org.jnp.interfaces");
| properties.put(Context.PROVIDER_URL, jmsServer);
| ctx = new InitialContext(properties);
| tcf = (QueueConnectionFactory) ctx.lookup("ConnectionFactory");
| conn = tcf.createQueueConnection();
| queue = (Queue) ctx.lookup(jmsQueue);
| conn.start();
| session = conn.createQueueSession(false,
QueueSession.AUTO_ACKNOWLEDGE);
| consumer = session.createReceiver(queue);
| }
|
| private ObjectMessage checkQueue() {
|
| try {
| ObjectMessage message = (ObjectMessage)
consumer.receive(queueWaitSeconds * 1000);
| if (message != null) {
| log.debug("got a message");
| return message;
| } else {
| log.debug("No message found for " +
queueWaitSeconds + " seconds");
| return null;
| }
| } catch (Exception e) {
| e.printStackTrace();
| } finally {
|
| }
| return null;
| }
|
| private void close(QueueConnection conn, QueueSession session,
QueueReceiver receiver, MessageProducer producer) {
| try {
| if (conn != null) {
| conn.close();
| }
| } catch (JMSException e) {
| e.printStackTrace();
| }
| try {
| if (session != null) {
| session.close();
| }
| } catch (JMSException e) {
| e.printStackTrace();
| }
| try {
| if (receiver != null) {
| receiver.close();
| }
| } catch (JMSException e) {
| e.printStackTrace();
| }
| try {
| if (producer != null) {
| producer.close();
| }
| } catch (JMSException e) {
| e.printStackTrace();
| }
| }
|
| private void sendReplyMessage(Destination replyTo, Serializable reply) {
| InitialContext ctx = null;
| QueueConnection conn = null;
| QueueConnectionFactory tcf = null;
| QueueSession session = null;
| MessageProducer producer = null;
|
| Properties properties = new Properties();
| try {
| properties.put(Context.INITIAL_CONTEXT_FACTORY,
"org.jnp.interfaces.NamingContextFactory");
| properties.put(Context.URL_PKG_PREFIXES,
"org.jnp.interfaces");
| properties.put(Context.PROVIDER_URL, jmsServer);
| ctx = new InitialContext(properties);
| tcf = (QueueConnectionFactory)
ctx.lookup("ConnectionFactory");
| conn = tcf.createQueueConnection();
| conn.start();
| session = conn.createQueueSession(false,
QueueSession.AUTO_ACKNOWLEDGE);
| producer = session.createProducer(replyTo);
| ObjectMessage response =
session.createObjectMessage(reply);
|
response.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
| producer.send(response);
| } catch (NamingException e) {
| e.printStackTrace();
| } catch (JMSException e) {
| e.printStackTrace();
| } finally {
| close(conn, session, null, producer);
| }
| }
|
| }
|
View the original post :
http://www.jboss.org/index.html?module=bb&op=viewtopic&p=4255824#4255824
Reply to the post :
http://www.jboss.org/index.html?module=bb&op=posting&mode=reply&p=4255824
_______________________________________________
jboss-user mailing list
[email protected]
https://lists.jboss.org/mailman/listinfo/jboss-user