Bob Muller [http://community.jboss.org/people/poesys] created the discussion

"Only one subscriber sees a topic message in a clustered topic setup"

To view the discussion, visit: http://community.jboss.org/message/595433#595433

--------------------------------------------------------------
I have created a straightforward Topic setup on a two-node JBoss 5.1 cluster 
(using the all server with Messaging defaults). Here's the topic configuration 
I added in destinations-service.xml on all nodes:

   <mbean
      code="org.jboss.jms.server.destination.TopicService"
      name="jboss.messaging.destination:service=Topic,name=PoesysCacheDelete"
      xmbean-dd="xmdesc/Topic-xmbean.xml">
      <depends 
optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
      <depends>jboss.messaging:service=PostOffice</depends>
   </mbean>


Our web app has a simple pojo cache (a Java Map) that holds data objects 
queried from a database. Each node has a separate singleton cache. I am using 
messaging to have the cache on each node remove an object on demand. So the 
behavior I want is:

1. The user updates an object.
2. The database transaction happens.
3. The data access code sends a message to the topic to remove the object from 
the cache (the payload is the key in the Map).
4. The registered listeners (MessageListener implementations) in the instance 
of the web app running on each node receive the published topic message and 
respond by removing the cached object as requested.

This works fine as long as I have only one node; I'm logging all this and I see 
the message sent and received and the cache deleted.

However, when I have two nodes, everything stops working. Looking at the logs, 
it appears that when a node receives the message and acts on it, the other 
node(s) don't ever see that message. Since I want the action to happen on all 
nodes, this means the cache operation(s) don't happen on all the nodes. In 
particular, the sticky-session setup means that the originating/publishing node 
only sees some of the messages and may or may not remove the object from its 
own cache, which then causes the next page to display old data.

This looks an awful lot like a race condition between the nodes. My 
understanding was (and seems to be verified by looking through the topics on 
this discussion forum) that a clustered topic means that ALL listeners get the 
message. I can verify that isn't true here--the instant a message is received 
on one node, the other node(s) don't see it.

I did verify that the mbean is clustered by examining it in the jmx-console for 
the nodes.

This is the Java code that implements MessageListener:


/**
 * A thread-based class that listens for messages about the Poesys/DB cache.
 * 
 * @author Robert J. Muller
 */
public class CacheMessageListener implements Runnable, MessageListener {
 
  /**
   * Logger for this class
   */
  private static final Logger logger =
    Logger.getLogger(CacheMessageListener.class);
 
  private static final String LISTENER_MSG =
    "com.poesys.db.dao.msg.listener_problem";
  private static final String DELETE_MSG =
    "com.poesys.db.dao.msg.delete_problem";
  private static final String INTERRUPT_MSG =
    "com.poesys.db.dao.msg.interrupted";
 
  /** JMS topic name for the Poesys/DB delete topic */
  public static final String DELETE_TOPIC = "topic/PoesysCacheDelete";
  /** JMS connection factory name */
  public static final String CONNECTION_FACTORY = "ClusteredConnectionFactory";
  /** JMS ObjectMessage property name for cache name property */
  public static final String CACHE_NAME_PROPERTY = "CacheName";
 
  private Connection connection;
  private Session sessionConsumer;
  private MessageConsumer consumer;
 
  /**
   * Runs the message listener.
   */
  public void run() {
    try {
      // Look up the connection factory using JNDI.
      Context initial = new InitialContext();
      ConnectionFactory cf =
        (ConnectionFactory)initial.lookup(CONNECTION_FACTORY);
 
      // Set this object to be a message listener for delete requests.
      Destination deleteTopic = (Destination)initial.lookup(DELETE_TOPIC);
      connection = cf.createConnection();
      sessionConsumer =
        connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
      consumer = sessionConsumer.createConsumer(deleteTopic);
      consumer.setMessageListener(this);
      connection.start();
 
      logger.info("Cache message listener started, listening for cache removal 
requests");
 
      // Sleep indefinitely until interruption.
      while (!Thread.currentThread().isInterrupted()) {
        // Sleeps for 10 seconds
        Thread.sleep(10 * 1000);
      }
    } catch (InterruptedException e) {
      String message = com.poesys.db.Message.getMessage(INTERRUPT_MSG, null);
      logger.info(message);
    } catch (Exception e) {
      String message = com.poesys.db.Message.getMessage(LISTENER_MSG, null);
      logger.error(message, e);
    } finally {
      if (connection != null) {
        try {
          connection.close();
        } catch (JMSException e) {
          String message = com.poesys.db.Message.getMessage(LISTENER_MSG, null);
          logger.error(message, e);
        }
      }
    }
  }
 
  /*
   * (non-Javadoc)
   * 
   * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
   */
  @Override
  public void onMessage(Message message) {
    IPrimaryKey key = null;
    String cacheName = null;
 
    if (message == null) {
      logger.error("Cache message listener received null message");
    } else {
      try {
        logger.debug("Received cache removal request");
        // Get the message and extract the key and the cache name.
        ObjectMessage objectMessage = (ObjectMessage)message;
        if (objectMessage != null) {
          // Message key is the object payload.
          Serializable object = objectMessage.getObject();
          if (object instanceof com.poesys.ms.pk.IPrimaryKey) {
            com.poesys.ms.pk.IPrimaryKey messageKey = 
              (com.poesys.ms.pk.IPrimaryKey)objectMessage.getObject();
            // Translate into database primary key.
            key = MessageKeyFactory.getKey(messageKey);
            // Cache name is a property.
            cacheName = objectMessage.getStringProperty(CACHE_NAME_PROPERTY);
            IDtoCache<?> cache = DaoManager.getCache(cacheName);
            // Remove the object from the local cache only if it's there; if
            // it's not there, move on since there's nothing to do.
            if (cache != null) {
              logger.debug("Removing key " + key.getValueList() + " from cache "
                           + cacheName);
              cache.removeLocally(key);
            } else {
              logger.debug("No cache from which to remove object");
            }
          } else {
            logger.error("Cache message listener received message with a 
payload that was not a primary key");
          }
        }
      } catch (JMSException e) {
        // log full information and ignore
        Object[] objects = { cacheName, key.getValueList() };
        String errorMsg = com.poesys.db.Message.getMessage(DELETE_MSG, objects);
        logger.error(errorMsg, e);
      } catch (RuntimeException e) {
        // log and ignore
        logger.error("Runtime exception in onMessage: ", e);
      }
    }
  }
}


This is the Java code that sends the message:


  @Override
  public void remove(IPrimaryKey key) {
    // Send a message to listeners asking to remove there. This will remove
    // the object from all listening caches with the cache name of this cache,
    // including THIS one.
    Connection connection = null;
    try {
      Context initial = new InitialContext();
      ConnectionFactory cf =
        
(ConnectionFactory)initial.lookup(CacheMessageListener.CONNECTION_FACTORY);
      Destination deleteTopic =
        (Destination)initial.lookup(CacheMessageListener.DELETE_TOPIC);
      connection = cf.createConnection();
      Session session =
        connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
      MessageProducer producer = session.createProducer(deleteTopic);
      connection.start();
      ObjectMessage om = session.createObjectMessage(key.getMessageObject());
      om.setStringProperty(CacheMessageListener.CACHE_NAME_PROPERTY,
                           getCacheName());
      producer.send(om);
      logger.debug("Sent message to remove " + key.getValueList()
                   + " from cache " + getCacheName());
    } catch (Exception e) {
      Object[] objects = { getCacheName() };
      String message = com.poesys.db.Message.getMessage(PRODUCER_MSG, objects);
      logger.error(message, e);
    } finally {
      if (connection != null) {
        try {
          connection.close();
        } catch (JMSException e) {
          Object[] objects = { getCacheName() };
          String message =
            com.poesys.db.Message.getMessage(PRODUCER_MSG, objects);
          logger.error(message, e);
        }
      }
    }
  }
--------------------------------------------------------------

Reply to this message by going to Community
[http://community.jboss.org/message/595433#595433]

Start a new discussion in JBoss Messaging at Community
[http://community.jboss.org/choose-container!input.jspa?contentType=1&containerType=14&container=2042]

_______________________________________________
jboss-user mailing list
[email protected]
https://lists.jboss.org/mailman/listinfo/jboss-user

Reply via email to