Sorry for posting this late.
Upgrading to CR2 of jboss-messaging solved the hang problem. I am posting the
working code anyway.
And these variables need to be set in build.properties.
jms.test.java.naming.provider.url = jnp://<server name>:1099
jms.test.java.naming.factory.initial = org.jnp.interfaces.NamingContextFactory
And the jboss client is a standalone program.
Thanks
Raghu
| import java.util.Date;
|
| import javax.jms.Connection;
| import javax.jms.ConnectionFactory;
| import javax.jms.Destination;
| import javax.jms.JMSException;
| import javax.jms.Message;
| import javax.jms.MessageConsumer;
| import javax.jms.MessageProducer;
| import javax.jms.ObjectMessage;
| import javax.jms.Session;
| import javax.jms.Topic;
| import javax.naming.InitialContext;
|
|
| import junit.framework.TestCase;
|
| public class TestJMS1 extends TestCase {
|
| Date date;
|
| static final String nameTopicConnFactory = "XAConnectionFactory";
| static final String nameTopic = "topic/testTopic";
|
| static final int CONSUMER_WAIT_TIME = 60000;
| static final int RELAYER_WAIT_TIME = 30000;
|
| public TestJMS1(String name) {
| super(name);
| }
|
| public void setUp() {
| }
|
| public void tearDown() {
| }
|
| public void testJMSFastPublisherSlowConsumerWithRelay() throws
Exception {
| String testName = "Publish/Subscribe with Slow Consumer and Relayer -
AUTO ACK ";
| ConnectionFactory connFactory = null;
| Connection connection = null;
| Session session = null;
| Destination topic = null;
| MyRelayerWithNewSession relayThread = null;
| MyConsumerWithNewSession consumerThread = null;
| WatchdogTimer watchdog = null;
|
| MessageProducer[] producers = new MessageProducer[1];
|
| try {
| InitialContext ic = new InitialContext ();
| //System.out.println ("Created InitialContext :: " + ic);
| String payload = "The static portion ";
|
| connFactory = (ConnectionFactory) ic.lookup (nameTopicConnFactory);
| topic = (Topic)ic.lookup(nameTopic);
| connection = connFactory.createConnection();
|
| session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
| producers[0] = session.createProducer(topic);
|
| relayThread = new MyRelayerWithNewSession(connection, topic,
"text0", "text1",
| RELAYER_WAIT_TIME, false, payload);
| consumerThread = new MyConsumerWithNewSession(connection, topic,
"text1",
| CONSUMER_WAIT_TIME, false, payload);
|
| connection.start();
|
| // watchdog = new WatchdogTimer(producers, consumers, 90000,
connection);
|
| //System.out.println("Sender starting");
| try {
| // For threads to create the sessions.
| Thread.sleep(5000);
| for(int i=0; i<10000; i++) {
| ObjectMessage message = session.createObjectMessage();
| message.setStringProperty("target", "text0");
| message.setObject(payload+i);
| producers[0].send(message);
| // System.out.println("Sent iter : " + i);
| // Thread.sleep(1000);
| if (i%100 == 0) {
| System.out.println("Sent " + i + "messages");
| }
| }
| } catch (Throwable t ) {
| System.out.println("Producer1 send got Error: "+t.getMessage());
| }
|
| System.out.println("Done with sending");
| Thread.sleep(200000);
| consumerThread.join();
| relayThread.join();
|
| } catch (Throwable t) {
| System.err.println("Error: "+t.getMessage());
| // t.printStackTrace(System.err);
| } finally {
| try {
| if (connection != null){
| connection.close();
| connection = null;
| }
| } catch (JMSException e) {
| e.printStackTrace();
| }
| }
|
| // if (watchdog.isAlive()) {
| // watchdog.interrupt();
| // }
| // watchdog.join();
|
| date = new Date();
| if (!watchdog.interrupted) {
| System.out.println(date.toString()+": "+testName + " : PASSED");
| } else {
| System.out.println(date.toString()+": "+testName + " : FAILED");
| }
| }
|
| class MyRelayerWithNewSession extends Thread {
| Connection connection = null;
| Destination topic = null;
| String localTarget = null;
| String relayTarget = null;
| int delay_ms = 0;
| boolean explicit_ack = false;
| String payload = null;
|
| Session consumeSession = null;
| Session produceSession = null;
|
| MessageConsumer consumer = null;
| MessageProducer producer = null;
|
| MyRelayerWithNewSession(Connection connection, Destination topic,
String localTarget,
| String relayTarget, int delay_ms, boolean explicit_ack, String
payload) {
| this.connection = connection;
| this.topic = topic;
| this.localTarget = localTarget;
| this.relayTarget = relayTarget;
| this.delay_ms = delay_ms;
| this.explicit_ack = explicit_ack;
| this.payload = payload;
| start();
| }
|
| public void run(){
|
| try {
| consumeSession = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
| produceSession = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
|
| consumer = consumeSession.createConsumer(topic,
"target='"+localTarget+"'", false);
| producer = produceSession.createProducer(topic);
|
| Thread.sleep(delay_ms);
| System.out.println("Relayer thread waking up");
| for (int i=0; i< 10000; i ++) {
| Message receivedMessage = consumer.receive();
| if (explicit_ack) {
| receivedMessage.acknowledge();
| }
| String receivedPayload = (String) ((ObjectMessage)
receivedMessage).getObject();
| // System.out.println("Relay Read at iter "+i+" : " +
receivedPayload);
| // System.out.println("Relay iter "+i+" :
"+receivedPayload.substring(19));
| // assertEquals(true, receivedPayload.equals(payload+i));
|
| ObjectMessage message = produceSession.createObjectMessage();
| message.setStringProperty("target", relayTarget);
| message.setObject(receivedPayload);
| producer.send(message);
|
| if (i%100 == 0) {
| System.out.println("Relayed " + i + "messages");
| }
| }
|
| System.out.println("Done with Relayer thread");
|
| } catch (Throwable t) {
| System.err.println("Relayer Thread encountered Error:
"+t.getMessage());
| // t.printStackTrace(System.err);
| }
| }
|
| }
|
| class MyConsumerWithNewSession extends Thread {
| Connection connection = null;
| Destination topic = null;
| String localTarget = null;
| int delay_ms = 0;
| boolean explicit_ack = false;
| String payload = null;
|
| Session consumeSession = null;
| MessageConsumer consumer = null;
|
| MyConsumerWithNewSession(Connection connection, Destination topic,
String localTarget,
| int delay_ms, boolean explicit_ack, String payload) {
| this.connection = connection;
| this.topic = topic;
| this.localTarget = localTarget;
| this.delay_ms = delay_ms;
| this.explicit_ack = explicit_ack;
| this.payload = payload;
| start();
| }
|
| public void run(){
| try {
| consumeSession = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
| consumer = consumeSession.createConsumer(topic,
"target='"+localTarget+"'", false);
|
| Thread.sleep(delay_ms);
| System.out.println("Receiver waking up");
| for (int i=0; i< 10000; i ++) {
| Message receivedMessage = consumer.receive();
| if (explicit_ack) {
| receivedMessage.acknowledge();
| }
| String receivedPayload = (String) ((ObjectMessage)
receivedMessage).getObject();
| // System.out.println("Receive Read at iter "+i+" : " +
receivedPayload);
| // System.out.println("Receive iter "+i+" :
"+receivedPayload.substring(19));
| // assertEquals(true, receivedPayload.equals(payload+i));
|
| if (i%100 == 0) {
| System.out.println("Received " + i + "messages");
| }
| }
|
| System.out.println("Done with Consumer thread");
|
| } catch (Throwable t) {
| System.err.println("Receiver Thread encountered Error:
"+t.getMessage());
| // t.printStackTrace(System.err);
| }
| }
| }
|
| static class WatchdogTimer extends Thread {
|
| long waitTime = 0L;
| boolean interrupted = false;
| Connection connection = null;
| MessageProducer[] producers = null;
| MessageConsumer [] consumers = null;
|
| public WatchdogTimer (MessageProducer[] producers, MessageConsumer []
consumers,
| long waitTime, Connection connection) {
| this.producers = producers;
| this.consumers = consumers;
| this.waitTime = waitTime;
| this.connection = connection;
| start();
| }
|
| public void run () {
|
| try {
| Thread.sleep(this.waitTime);
| this.interrupted = true;
|
| System.out.println("Watchdog waking up: closing producers and
consumers");
| for (int i = 0; i < this.consumers.length; i++) {
| this.consumers.close();
| }
| for (int i = 0; i < this.producers.length; i++) {
| this.producers.close();
| }
|
| connection.close();
| } catch (InterruptedException thrExc) {
| System.out.println("watchdog interrupted");
| } catch (JMSException jmsExc) {
| System.out.println("watchdog got jmsExc");
| }
|
| }
|
| }
|
| }
|
View the original post :
http://www.jboss.com/index.html?module=bb&op=viewtopic&p=3953539#3953539
Reply to the post :
http://www.jboss.com/index.html?module=bb&op=posting&mode=reply&p=3953539
Using Tomcat but need to do more? Need to support web services, security?
Get stuff done quickly with pre-integrated technology to make your job easier
Download IBM WebSphere Application Server v.1.0.1 based on Apache Geronimo
http://sel.as-us.falkag.net/sel?cmd=lnk&kid=120709&bid=263057&dat=121642
_______________________________________________
JBoss-user mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/jboss-user