I am trying to implement a system that will require consistently high volume
with long running and constantly connected clients. To date I have followed
the samples included with JBoss Messaging and am creating a new
connection+session+temp_response_queue on a per request basis. It was
suggested that some of this is actually an anti-pattern.
This actually works fairly well and sufficiently fast on my local development
environment aka, laptop, but breaks down when I promote it to a clustered
development environment that is otherwise identical. I fairly quickly run out
of memory, and get a 25+% failure rate even at low volumes. I am inclined to
believe it is in part due to the way in which I am accessing the queue's and
topics.
I would like advice on the best way to handle the "pooling" of some of these
objects (connections and sessions) for reuse. Is this advisable at all? If
so, is there solution examples anyone is aware of? I would hate to have to
create my own pooling logic with all the inherent issues in high volume systems.
One example of where I do not see the feasibility of this is in the connection
object. I do not see any exposed way in the API to "test" the connectedness of
the connection objects. I would assume in any pooling situation I would have
to make sure it is still alive, etc.
Any help would be appreciated.
Thanks,
Brandon
=======================================================
Solution Descrpition
Client: A simple one form one page JSF app
Server: A fairly simple MDB. It currently makes an HTTP request and echo's the
response to a client created temporary response queue. It also forwards a copy
to an "observer" topic for further SOA visibility.
| public String executeSynchronous(String resourceUrl, HTTPMethod method,
String payload, final long timeout) {
| JMSUtil jmsUtil = new JMSUtil();
| if (timeout > MAX_TIMEOUT) {
| return "Error: timeout provided exeeds MAX_TIMEOUT(" +
MAX_TIMEOUT + ")";
| }
|
| Map<String, String> headers = new HashMap<String, String>();
| headers.put("Method", method.name());
| headers.put("URL", resourceUrl);
|
| String messageId = JMSUtil.generateUUID();
| Session session = jmsUtil.generateSession();
| Queue responseQueue = jmsUtil.generateResponseQueue(session);
| jmsUtil.sendSOAQueueMessage(session, payload,
jndiReferenceSOAIn, headers, messageId, responseQueue);
| String response = jmsUtil.getSOAResponse(session,
responseQueue, timeout);
| jmsUtil.closeSession(session);
|
| return response;
| }
|
calls...
| public void sendSOAQueueMessage(Session session, String payload, String
jndiReference, Map<String, String> headers, String messageId, Queue
responseQueue) {
| try {
| // Create Message
| TextMessage message =
session.createTextMessage(payload);
|
| // Assign Headers
| for (String headerName : headers.keySet()) {
| message.setStringProperty(headerName,
headers.get(headerName));
| }
|
| // Set UUIDreturn null;
| message.setStringProperty("EntertainmentSOARequestId",
messageId);
| message.setJMSReplyTo(responseQueue);
|
| // Create the producer.
|
| MessageProducer sender =
session.createProducer(getSOAInQueue());
| sender.send(message);
| sender.close();
|
| // Commit if needed
| if (transacted) {
| session.commit();
| }
| } catch (JMSException e) {
| logger.error("A JMS Exception occurred in sending a JMS
SOA message!", e);
| }
| }
|
Supported by the following util methods...
| rotected static Connection getConnection() {
| if (connection == null) {
| Hashtable properties = new Hashtable();
| properties.put(Context.INITIAL_CONTEXT_FACTORY,
"org.jnp.interfaces.NamingContextFactory");
| properties.put(Context.URL_PKG_PREFIXES,
"org.jboss.naming:org.jnp.interfaces");
| properties.put(Context.PROVIDER_URL,
"jnp://localhost:1099");
| properties.put(Context.SECURITY_PRINCIPAL, "not_real");
| properties.put(Context.SECURITY_CREDENTIALS,
"not_real");
|
| Context context;
| try {
| context = new InitialContext(properties);
| connectionFactory = (ConnectionFactory)
context.lookup("ConnectionFactory");
| } catch (NamingException e) {
|
| e.printStackTrace();
| return null;
| }
|
| try {
| connection =
connectionFactory.createConnection();
| connection.start();
| } catch (JMSException e) {
| return null;
| }
| }
| return connection;
| }
| public Queue generateResponseQueue(Session session) {
| try {
| return session.createTemporaryQueue();
| } catch (JMSException e) {
| logger.error("Could not generate temporary queue for
receiving response", e);
| return null;
| }
| }
| public Session generateSession() {
| try {
| // Get a connection
| Connection connection = getConnection();
|
| // Create the session
| return connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
| } catch (JMSException e) {
| logger.error("Could not generate session", e);
| return null;
| }
| }
| public String getSOAResponse(Session session, Destination responseQueue,
long timeout) {
| try {
| // Create the consumer
| MessageConsumer consumer =
session.createConsumer(responseQueue);
| Message message = consumer.receive(timeout);
| message.acknowledge();
| consumer.close();
|
| if (message != null && message instanceof TextMessage) {
| return ((TextMessage) message).getText();
| } else {
| logger.error("Recieved message was not of type
TextMessage!");
| return "Recieved message was not of type
TextMessage!";
| }
|
| } catch (JMSException e) {
| logger.error("A JMS Exception occurred in receiving a
JMS SOA message!", e);
| return "A JMS Exception occurred in receiving a JMS SOA
message! " + e.getMessage();
| }
| }
|
MDB
| @MessageDriven(activationConfig = { @ActivationConfigProperty(propertyName
= "destinationType", propertyValue = "javax.jms.Queue"),
| @ActivationConfigProperty(propertyName = "destination",
propertyValue = "queue/SOAInQueue"),
| @ActivationConfigProperty(propertyName = "acknowledgeMode",
propertyValue = "Auto-acknowledge") })
| public class SOARouterMDB implements MessageListener {
|
| private final RESTAccessor restAccessor = new RESTAccessor();
|
| private static Logger logger = Logger.getLogger(SOARouterMDB.class);
|
| public void onMessage(Message msg) {
| try {
| Responder responder = new Responder();
| TextMessage tmsg = (TextMessage) msg;
|
| HTTPMethod method =
HTTPMethod.valueOf(tmsg.getStringProperty("Method"));
| String url = tmsg.getStringProperty("URL");
| Map<String, String> httpHeaders = new HashMap<String,
String>();
| Map<String, String> allHeaders = new HashMap<String,
String>();
| Destination destination = tmsg.getJMSReplyTo();
|
| System.out.println("Destination: " + destination);
|
| // Load HttpHeaders
| Enumeration<String> propertyNames =
tmsg.getPropertyNames();
| while (propertyNames.hasMoreElements()) {
| String propertyName =
propertyNames.nextElement();
| if (propertyName.startsWith("HttpHeader")) {
|
httpHeaders.put(propertyName.substring(10),
tmsg.getStringProperty(propertyName));
| }
| allHeaders.put(propertyName,
tmsg.getStringProperty(propertyName));
| }
|
| HTTPResponse response = null;
|
| switch (method) {
| case GET:
| response = restAccessor.doGet(url, httpHeaders);
| if (!response.isError()) {
| responder.respond(destination,
allHeaders, response.getData());
| responder.broadcast(allHeaders,
response.getData());
| } else {
| responder.reportError(tmsg);
| }
| break;
| case POST:
| response = restAccessor.doPost(url,
httpHeaders, new ByteArrayInputStream(tmsg.getText().getBytes()));
| if (!response.isError()) {
| responder.respond(destination,
allHeaders, response.getData());
| responder.broadcast(allHeaders,
response.getData());
| } else {
| responder.reportError(tmsg);
| }
| break;
| case DELETE:
| response = restAccessor.doDelete(url,
httpHeaders, tmsg.getText());
| if (!response.isError()) {
| responder.respond(destination,
allHeaders, response.getData());
| responder.broadcast(allHeaders,
response.getData());
| } else {
| responder.reportError(tmsg);
| }
| break;
| case PUT:
| response = restAccessor.doPost(url,
httpHeaders, new ByteArrayInputStream(tmsg.getText().getBytes()));
| if (!response.isError()) {
| responder.respond(destination,
allHeaders, response.getData());
| responder.broadcast(allHeaders,
response.getData());
| } else {
| responder.reportError(tmsg);
| }
| break;
| default:
| // TODO: asdf
| }
| } catch (Exception e) {
| logger.error("Error processing SOA request", e);
| }
| }
|
| @PreDestroy
| public void destroy() {
| }
|
| }
|
View the original post :
http://www.jboss.com/index.html?module=bb&op=viewtopic&p=4177242#4177242
Reply to the post :
http://www.jboss.com/index.html?module=bb&op=posting&mode=reply&p=4177242
_______________________________________________
jboss-user mailing list
[email protected]
https://lists.jboss.org/mailman/listinfo/jboss-user