Hi,

I am using JBoss 4.2.3.GA with 1.4.0.SP3.  I upgraded both in hopes to fix the 
following issue.

In the event that a standalone jms client is connected to multiple JBoss AS via 
the ClusterConnectionFactory and a server dies, the client will failover to the 
remaining servers.

The problem I am trying to solve is if you have 1 server in the cluster at the 
time.  I have found that using an exception listener works reliably if you are 
using ConnectionFactory but not for ClusteredConnectionFactory.

If ConnectionFactory is looked up, the connection is reestablished if the 
server goes down and comes back.  If ClusteredConnectionFactory is used, the 
connection is reestablished the first time the single server dies but fails on 
the second attempt.

Here is the code:


public class JMSConnection implements Runnable {
    protected static Logger log = Logger.getLogger(JMSConnection.class);
    protected static AtomicInteger msgcount = new AtomicInteger();
    private Session session;
    private Queue queue;
    private String dest;
    private BlockingQueue bq;
    private final static ScheduledExecutorService scheduler = Executors
            .newScheduledThreadPool(1);
    private Connection connection = null;
    protected static String sourceName; // the name of the client
    final static int NUM_RETRIES = 1000000000;
    final static int RETRY_INTERVAL = 1000;
    private long id = System.nanoTime();
    volatile boolean doSend = true;
    
    ConnectionFactory connectionFactory;
    MessageProducer producer;

    static {

        /* The delay between */
        final long delay = 1;
        /* The period to send */
        final long period = 1;

        /*
         * Set logging level to debug to have the scheduler report the number of
         * messages being sent by each thread
         */
        scheduler.scheduleAtFixedRate(new Runnable() {
            public void run() {
                if (log.isInfoEnabled()) {
                    // if (connected) {
                    log.info(sourceName + ": sent " + msgcount + " per "
                            + period + " seconds.");
                    msgcount.set(0);
                    // } else {
                    // log.info("Problem with JMS Connection from "
                    // + sourceName
                    // + "! Please check jboss application server");
                    // }
                }
            }
        }, delay, period, TimeUnit.SECONDS);
    }

    /**
     * // * // * @param dest // * The name of the destination // * @param bq //
     * * The blocking queue // * @param m_sourceName // * The source name // * 
@throws
     * Exception //
     */
    public JMSConnection(String dest, BlockingQueue bq,
            String m_sourceName) throws Exception {
        this.dest = dest;
        this.bq = bq;
        sourceName = m_sourceName;
        init();
        log.info("JMS Connection");
    }

    /**
     * // * // * @throws Exception //
     */
    private void init() throws Exception {

        setUpJMS();

    }

    public boolean setUpJMS() {
        InitialContext ic;
        try {
            ic = new InitialContext();
            connectionFactory = (ConnectionFactory) ic
                    .lookup("ClusteredConnectionFactory");
            queue = (Queue) ic.lookup("queue/RouteManagerQueue");
            connection = connectionFactory.createConnection();
            try {
                log.debug("Connection created ...");

                // KEY - register for exception callbacks
                connection.setExceptionListener(new ExceptionListenerImpl());

                session = connection.createSession(false,
                        Session.AUTO_ACKNOWLEDGE);
                log.debug("Session created ...");
                producer = session.createProducer(queue);

                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                log.debug("Producer created ...");

                return true;
            } catch (Exception e) {
                // We failed so close the connection
                try {
                    connection.close();
                } catch (JMSException ignored) {
                    // Pointless
                }
                // Rethrow the initial problem to where we will log it
                throw e;
            } finally {
                // And close the initial context
                // We don't want to wait for the garbage collector to close it
                // otherwise we'll have useless hanging network connections
                ic.close();
            }
        } catch (Exception e) {
            log.error("Error setting up JMS", e);
            return false;
        }
    }

    public void run() {
        int cnt = 0;
        while (doSend) {
            try {
                 Serializable s = bq.take();
                ObjectMessage message = session.createObjectMessage();
                message.setObject(s);
                producer.send(message);
                msgcount.incrementAndGet();
                log.info("Sending a attribute from " + id);
            } catch (Exception e) {
                log.error("Problem sending message " + e.getMessage(), e);
            }
        }
    }

    private class ExceptionListenerImpl implements ExceptionListener {
        public void onException(JMSException e) {

            for (int i = 0; i < NUM_RETRIES; i++) {
                log
                        .warn("Connection has problems, trying to re-create it, 
attempt "
                                + (i + 1) + " ...");

                try {
                    connection.close(); // unregisters the ExceptionListener
                } catch (Exception e2) {
                    // I will get an Exception anyway, since the connection to
                    // the server is
                    // broken, but close() frees up resources associated with
                    // the connection
                }

                boolean setupOK = setUpJMS();

                if (setupOK) {
                    log.info("Connection re-established");
                    return;
                } else {
                    log.warn("Re-creating connection failed, retrying ...");
                }
            }

            log.error("Cannot re-establish connection, giving up ...");
            doSend = false;
        }
    }

I am using the remoting jar that comes with 4.2.3.GA, and the messaging-cleint 
jar that comes with 1.4.0.SP3

Thanks in advance,

View the original post : 
http://www.jboss.com/index.html?module=bb&op=viewtopic&p=4181558#4181558

Reply to the post : 
http://www.jboss.com/index.html?module=bb&op=posting&mode=reply&p=4181558
_______________________________________________
jboss-user mailing list
[email protected]
https://lists.jboss.org/mailman/listinfo/jboss-user

Reply via email to