Hi Tim,

First of all, I have a large portion of humble pie to eat - I hadn't clustered 
the SLSB that publishes. However, I'm still seeing unreliable results, so I'll 
elaborate on what I am doing...

I have a pojo that connects via JNDI to queue/testQueue on the 
ClusteredConnectionFactory. 


  |     private void startQueue() throws JMSException, NamingException {
  |             conn = ((ConnectionFactory) 
ctx.lookup("ClusteredConnectionFactory"))
  |                             .createConnection("guest", "guest");
  |             queue = (Queue) ctx.lookup("queue/testQueue");
  |             session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
  | 
  |             conn.start();
  |     }
  | 
It sends 1000 messages (say) of size 1Kb, that has an integer property 
'TopicID' that is a random number from 1 to 50, representing the index of the 
topic that the message will be forwarded to inside JBoss.

In my two node JBoss cluster, I have an EJB3 MDB that is listening to the queue 
configured thus:


  | @Clustered
  | @MessageDriven(
  |     messageListenerInterface=MessageListener.class,
  | activationConfig = {
  |     @ActivationConfigProperty(propertyName="destination", 
propertyValue="queue/testQueue"),
  |     @ActivationConfigProperty(propertyName="destinationType", 
propertyValue="javax.jms.Queue")
  | })
  | public class OamServerPushImpl implements OamServerPush, MessageListener
  | 

This MDB simply forwards onto a SLSB configured as follows, which publishes the 
message to the topic indicated by the TopicID property :


  | @Clustered
  | @Stateless
  | public class PublisherImpl implements Publisher  {
  |     @Resource(mappedName="ClusteredConnectionFactory")
  |     ConnectionFactory topicConnectionFactory;
  | 
  |     @Resource
  |     private SessionContext sc;
  | 
  |     Topic topic;
  | 
  |     public void redispatch( Message m, String sReceiverAddress, int 
iMessageID) {
  |         Session session = null;
  |         TextMessage message = null;
  | 
  |         try {
  |             int iTopicID = m.getIntProperty("TopicID");
  |             String sPayload = m.getStringProperty("Payload");
  |                 connection = topicConnectionFactory.createConnection();
  |                 session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
  | //              connection.stop();
  |                 connection.start();
  | 
  |                 message = session.createTextMessage();
  | 
  |                 message.setText("Redispatching : " + sPayload);
  |                 message.setStringProperty("ReceivedBy", sReceiverAddress);
  |                 message.setStringProperty("PublishedBy", 
InetAddress.getLocalHost().getHostAddress());
  |                 message.setIntProperty("MessageID", iMessageID);
  |                     sendToClients(session, message, iTopicID);
  |                     connection.close();
  |         } catch (Throwable t) {
  |             // JMSException could be thrown
  |             logger.warn(
  |                     "PublisherBean.redispatch: " + "Exception: "
  |                     + t.toString());
  |             sc.setRollbackOnly();
  |         } finally {
  |             if (session != null) {
  |                 try {
  |                     session.close();
  |                 } catch (JMSException e) {
  |                 }
  |             }
  |         }
  |     }
  | 
  |     public void sendToClients(Session session, Message message, int 
iTopicID)
  |                     throws JMSException {
  |             topic = (Topic) sc.lookup("topic/testDurableTopic" + iTopicID);
  |         MessageProducer publisher = session.createProducer(topic);
  |             publisher.send(message);
  |             publisher.close();
  |         logger.info("Publisher BEAN: Message published: (" +
  |                 ((TextMessage)message).getText().length() +") bytes");
  |     }
  | 


I then have another 50 pojos outside the container each listening to one topic 
via a durable subscription


  | public class MessageSubscriberClient implements Runnable {
  |     private static final Logger log = 
Logger.getLogger(MessageSubscriberClient.class);
  | 
  |     int iCount = 0;
  |     Connection conn;
  |     Session session;
  |     TopicSubscriber subscriber;
  |     Topic topic;
  |     Context ctx;
  |     int i;
  |     boolean bFinished = false;
  | 
  |     static int NUMBER_OF_MESSAGES = 1000;
  | 
  |     static final int NUMBER_OF_TOPICS = 50;
  | 
  | 
  |     public MessageSubscriberClient(Context ctx, int i) {
  |             this.i = i;
  |             this.ctx = ctx;
  | 
  |             try {
  |                     startTopic();
  |             }
  |             catch (JMSException e1) {
  |                     log.error(e1.getMessage(), e1);
  |             }
  |             catch (NamingException e) {
  |                     log.error(e.getMessage(), e);
  |             }
  | 
  |     }
  | 
  | 
  |     public void run() {
  |             try {
  |                     receive();
  |             } catch (JMSException e) {
  |                     log.error(e.getMessage(), e);
  |             } catch (NamingException e) {
  |                     log.error(e.getMessage(), e);
  |             }
  |             log.warn("Stopping topic on " + i);
  |             stopTopic();
  |     }
  | 
  |     public void receive() throws JMSException, NamingException {
  |             while ( ! bFinished) {
  |                     try {
  |                             if (subscriber != null) {
  |                                     process();
  |                             }
  |                             else {
  |                                         // try to recover
  |                                     log.info(i + " not well - subscriber 
null");
  | //                                  stopTopic();
  | //                                  startTopic();
  |                             }
  |                     } catch (JMSException e) {
  |                             // try to recover
  | //                     stopTopic();
  | //                     startTopic();
  |                     }
  |             }
  |             log.info("Finished receiving on " + i);
  | 
  |     }
  | 
  | 
  |     public int getNumReceivedMessages() {
  |             return iCount;
  |     }
  | 
  |     private void startTopic() throws JMSException, NamingException {
  |             conn = (Connection) ((ConnectionFactory) ctx
  |                             
.lookup("ClusteredConnectionFactory")).createConnection("john" + i, "needle");
  | 
  |             topic = (Topic) ctx.lookup("topic/testDurableTopic" + i);
  |             session = conn.createSession(false,
  |                             TopicSession.AUTO_ACKNOWLEDGE);
  | 
  |             conn.stop();
  | 
  |             // subscription
  |             subscriber = session.createDurableSubscriber(topic, "finbar" + 
i);
  | 
  |             conn.start();
  | 
  |     }
  | 
  |     private void stopTopic() {
  |             log.info("In Stopping topic on " + i);
  |             try {
  |                     if (subscriber != null) {
  |                             subscriber.close();
  |                             log.info("finbar" + i + " closed");
  |                     }
  |             } catch (JMSException e) {
  |                     log.error(e.getMessage(), e);
  |             }
  |             try {
  |                     if (session != null) {
  |                             session.unsubscribe("finbar" + i);
  |                             log.info("finbar" + i + " unsubscribed");
  |                     }
  |             } catch (JMSException e) {
  |                     log.error(e.getMessage(), e);
  |             }
  |             try {
  |                     if (conn != null)
  |                             conn.close();
  |             } catch (JMSException e) {
  |                     log.error(e.getMessage(), e);
  |             }
  |     }
  | 
  |     public void setFinished( boolean b) {
  |             bFinished = b;
  |     }
  | 
  |     public void process() throws JMSException {
  |             Message m = subscriber.receive(1000);
  |             TextMessage t = (TextMessage) m;
  |             if (t != null) {
  |                     m.acknowledge();
  |                     iCount++;
  |             }
  |     }
  | }
  | 

In the receive method above, I've indicated where I've tried to detect receiver 
problems and recover by stopping and stating the topic.

I then run the receiver and sender code, wait until messages happily being sent 
and received, and then kill the JBoss node (kill -9) that is processing the 
messages. 

With the receiver code as-is above (not trying to recover), the threads just 
sit in the while loop reporting


  | 34864 [Thread-94] ERROR org.jboss.jms.client.container.ClosedInterceptor  - 
ClosedInterceptor.ClientConsumerDelegate[e9-cs3ijw9f-1-64tmiw9f-c5iuxj-r55ss4]: 
method receive() did not go through, the interceptor is CLOSED
  | javax.jms.IllegalStateException: The object is closed
  |     at 
org.jboss.jms.client.container.ClosedInterceptor.invoke(ClosedInterceptor.java:157)
  |     at 
org.jboss.aop.advice.PerInstanceInterceptor.invoke(PerInstanceInterceptor.java:105)
  |     at 
org.jboss.jms.client.delegate.ClientConsumerDelegate$receive_N8299950230150603585.invokeNext(ClientConsumerDelegate$receive_N8299950230150603585.java)
  |     at 
org.jboss.jms.client.delegate.ClientConsumerDelegate.receive(ClientConsumerDelegate.java)
  |     at 
org.jboss.jms.client.JBossMessageConsumer.receive(JBossMessageConsumer.java:86)
  |     at 
com.ipaccess.MessageSubscriberClient.process(MessageSubscriberClient.java:154)
  |     at 
com.ipaccess.MessageSubscriberClient.receive(MessageSubscriberClient.java:78)
  |     at 
com.ipaccess.MessageSubscriberClient.run(MessageSubscriberClient.java:64)
  |     at java.lang.Thread.run(Thread.java:595)
  | 

If on the other hand I try to recover the topic by stopping and starting it, I 
get much 'better' results (I sometimes receive all messages), but more llikely 
I find that some messages get stuck in JBoss. The receivers will sit there 
listening whilst JBoss client side code reports the failover (with a lot of 
jboss remoting logging that I realise is an outstanding JIRA task)


  | 294287 [Timer-0] WARN org.jboss.remoting.LeasePinger  - 
LeasePinger[SocketClientInvoker[1c0f2e5, 
bisocket://172.28.5.208:4457](4ss15g-cvnwhk-f9wjxoka-1-f9wjxoll-3)] failed to 
ping to server: Can not get connection to server. Problem establishing socket 
connection for InvokerLocator 
[bisocket://172.28.5.208:4457/?clientLeasePeriod=10000&clientMaxPoolSize
  | =200&clientSocketClass=org.jboss.jms.client.remoting.ClientSocketWrapper&
  | dataType=jms&marshaller=org.jboss.jms.wireformat.JMSWireFormat&numberOf
  | 
CallRetries=1&numberOfRetries=10&pingFrequency=214748364&pingWindowFactor=10&
  | 
socket.check_connection=false&timeout=0&unmarshaller=org.jboss.jms.wireformat.JMSWireFormat]
  | 294584 [Thread-277] WARN org.jboss.remoting.LeasePinger  - 
LeasePinger[SocketClientInvoker[1c0f2e5, 
bisocket://172.28.5.208:4457](4ss15g-cvnwhk-f9wjxoka-1-f9wjxoll-3)] failed 
sending disconnect for client lease for client with session ID 
4ss15g-cvnwhk-f9wjxoka-1-f9wjxqil-1t
  | 

and 


  | - unable to get secondary locator
  | org.jboss.remoting.CannotConnectException: Can not get connection to 
server. Problem establishing socket connection for InvokerLocator 
[bisocket://172.28.5.208:4457/?clientLeasePeriod=10000&clientMaxPoolSize=200&clientSocketClass=org.jboss.
  | 
jms.client.remoting.ClientSocketWrapper&dataType=jms&marshaller=org.jboss.jms.wireformat.JMSWireFormat&
  | 
numberOfCallRetries=1&numberOfRetries=10&pingFrequency=214748364&pingWindowFactor=10&socket.check_
  | 
connection=false&timeout=0&unmarshaller=org.jboss.jms.wireformat.JMSWireFormat]
  |     at 
org.jboss.remoting.transport.socket.MicroSocketClientInvoker.transport(MicroSocketClientInvoker.java:532)
  |     at 
org.jboss.remoting.transport.bisocket.BisocketClientInvoker.transport(BisocketClientInvoker.java:413)
  |     at 
org.jboss.remoting.MicroRemoteClientInvoker.invoke(MicroRemoteClientInvoker.java:122)
  |     at 
org.jboss.remoting.transport.bisocket.BisocketClientInvoker.getSecondaryLocator(BisocketClientInvoker.java:538)
  |     at 
org.jboss.remoting.transport.bisocket.BisocketServerInvoker.createControlConnection(BisocketServerInvoker.java:228)
  |     at 
org.jboss.remoting.transport.bisocket.BisocketClientInvoker.transport(BisocketClientInvoker.java:402)
  |     at 
org.jboss.remoting.MicroRemoteClientInvoker.invoke(MicroRemoteClientInvoker.java:122)
  |     at org.jboss.remoting.Client.invoke(Client.java:1634)
  |     at org.jboss.remoting.Client.addCallbackListener(Client.java:1703)
  |     at org.jboss.remoting.Client.addListener(Client.java:921)
  |     at 
org.jboss.jms.client.remoting.JMSRemotingConnection.addInvokerCallbackHandler(JMSRemotingConnection.java:237)
  |     at 
org.jboss.jms.client.remoting.JMSRemotingConnection.start(JMSRemotingConnection.java:312)
  |     at 
org.jboss.jms.client.delegate.ClientClusteredConnectionFactoryDelegate.establishCallback(ClientClusteredConnectionFactoryDelegate.java:99)
  |     at 
org.jboss.jms.client.remoting.ConnectionFactoryCallbackHandler$CallbackConnectionListener.handleConnectionException(ConnectionFactoryCallbackHandler.java:105)
  |     at 
org.jboss.remoting.ConnectionValidator$1.run(ConnectionValidator.java:452)
  | Caused by: java.net.ConnectException: Connection refused: connect
  |     at java.net.PlainSocketImpl.socketConnect(Native Method)
  |     at java.net.PlainSocketImpl.doConnect(PlainSocketImpl.java:333)
  |     at java.net.PlainSocketImpl.connectToAddress(PlainSocketImpl.java:195)
  |     at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:182)
  |     at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:366)
  |     at java.net.Socket.connect(Socket.java:519)
  |     at 
org.jboss.remoting.transport.socket.SocketClientInvoker.createSocket(SocketClientInvoker.java:187)
  |     at 
org.jboss.remoting.transport.bisocket.BisocketClientInvoker.createSocket(BisocketClientInvoker.java:420)
  |     at 
org.jboss.remoting.transport.socket.MicroSocketClientInvoker.getConnection(MicroSocketClientInvoker.java:815)
  |     at 
org.jboss.remoting.transport.socket.MicroSocketClientInvoker.transport(MicroSocketClientInvoker.java:525)
  |     ... 14 more
  | 

eventually (over an hour later) , most receivers reconnect, but some never do. 
I see


  | 1562035 [Thread-313] ERROR org.jboss.remoting.MicroRemoteClientInvoker  - 
error shutting down lease pinger
  | 

and then it all goes quiet - no more logging. Despite leaving the 
sender/receivers and one JBoss node up for well over and hour, no more messages 
get through.

I wasn't expecting to have to stop and start the topics on the receivers, but 
it looks like I have to. But even if I do, it is probable that some receivers 
do not failover.


Other info...

I execute sender and receivers from JUnit, but I think putting it in this post 
is unnecessary

Inside JBoss, the destinations are configured like so


  | 
  |    <mbean code="org.jboss.jms.server.destination.QueueService"
  |       name="jboss.messaging.destination:service=Queue,name=testQueue"
  |       xmbean-dd="xmdesc/Queue-xmbean.xml">
  |       <depends 
optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
  |       <depends>jboss.messaging:service=PostOffice</depends>
  |       <attribute name="SecurityConfig">
  |       <attribute name="Clustered">true</attribute>
  |          <security>
  |             <role name="guest" read="true" write="true"/>
  |             <role name="publisher" read="true" write="true" create="false"/>
  |             <role name="noacc" read="false" write="false" create="false"/>
  |          </security>
  |       </attribute>
  |    </mbean>
  | 
  |   <!-- Repeat for testDurableTopic0 to testDurableTopic49 -->
  |   <mbean code="org.jboss.jms.server.destination.TopicService"       
name="jboss.messaging.destination:service=Topic,name=testDurableTopic0"
  |       xmbean-dd="xmdesc/Topic-xmbean.xml">
  |       <depends 
optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
  |       <depends>jboss.messaging:service=PostOffice</depends>
  |       <attribute name="Clustered">true</attribute>
  |     <attribute name="SecurityConfig">
  |        <security>
  |          <role name="guest" read="true" write="true"/>
  |          <role name="publisher" read="true" write="true" create="false"/>
  |          <role name="durpublisher" read="true" write="true" create="true"/>
  |       </security>
  |     </attribute>
  |   </mbean>
  | 

I have a MySQl-persistence-service.xml with the appropriate clustered attribute 
and SQL to set up the users, passwords, roles and ClientIDs 


  | <attribute name="Clustered">true</attribute>
  | ...
  | POPULATE.TABLES.69 = INSERT INTO JBM_USER (USER_ID,PASSWD,CLIENTID) VALUES 
('john49', 'needle', 'DurableSubscriberExample49')
  | POPULATE.TABLES.119 = INSERT INTO JBM_ROLE (ROLE_ID, USER_ID) VALUES 
('durpublisher','john49')
  | 



and I set ServerPeerID to 0 and 1 respectively in messaging-service.xml for the 
two nodes.

JBoss is running on RHEL 4 JRE 1.5.0_12
JBoss 4.2.2 GA with JBM 1.4.0.SP1 configured as per documentation.
Pojo sender and receiver are running on WinXP JRE 1.5.0_12.



View the original post : 
http://www.jboss.com/index.html?module=bb&op=viewtopic&p=4111165#4111165

Reply to the post : 
http://www.jboss.com/index.html?module=bb&op=posting&mode=reply&p=4111165
_______________________________________________
jboss-user mailing list
[email protected]
https://lists.jboss.org/mailman/listinfo/jboss-user

Reply via email to