/*
 * Copyright (C) The Apache Software Foundation. All rights reserved.
 *
 * This software is published under the terms of the Apache Software License
 * version 1.1, a copy of which has been included with this distribution in
 * the LICENSE file.
 *
 * $Id: MessengerSupport.java,v 1.1 2002/01/31 19:44:27 william Exp $
 */
package org.apache.commons.messenger;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.LinkedList;

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueRequestor;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.ServerSessionPool;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicPublisher;
import javax.jms.TopicRequestor;
import javax.jms.TopicSession;
import org.apache.log4j.*;

/** <p><code>MessengerSupport</code> is an abstract base class which implements
  * most of the functionality of Messenger. Derivations need to specify the
  * connection and session creation and the pooling strategy.</p>
  *
  * @author <a href="mailto:jstrachan@apache.org">James Strachan</a>
  * @version $Revision: 1.1 $
  */
public abstract class MessengerSupport implements Messenger {

	private static final boolean CACHE_REQUESTOR = true;

	private static final Category log = Category.getInstance( "org.apache.commons.messenger" );

	/** The name of the Messenger */
	private String name;

	/** are topic subscribers durable? */
	private boolean durable;

	/** the durable name used for durable topic based subscriptions */
	private String durableName;

	/** whether local messages are ignored when topic based subscription is used
	 * with a message selector */
	private boolean noLocal;

	/** A Map of ListenerKey objects to MessageConsumer objects */
	private Map listeners = new HashMap();

	/** A Map of MessageConsumer objects indexed by Destination or Destination and selector */
	private Map consumers = new HashMap();

	/** A Map of MessageProducer objects indexed by Destination */
	private Map producers = new HashMap();

	///** A Map of Queue or Topic Requestors indexed by Destination */
	//private Map requestors = new HashMap();
	private ThreadLocal requestorsMap = new ThreadLocal() {
		protected Object initialValue() {
			return new HashMap();
		}
	};

	/** The inbox which is used for the call() methods */
	private Destination replyToDestination;


	public MessengerSupport() {
	}

	public String toString() {
		try {
			Session session = borrowSession();
			String answer = super.toString() + " session: " + session.toString();
			returnSession( session );
			return answer;
		}
		catch (Exception e) {
			return super.toString() + " session: " + e.toString();
		}
	}

	public Destination getDestination(String subject) throws JMSException {
		Session session = borrowSession();
		try {
			if ( session instanceof TopicSession ) {
				return getTopic( (TopicSession) session, subject );
			}
			else {
				return getQueue( (QueueSession) session, subject );
			}
		}
		finally {
			returnSession( session );
		}
	}

	public Destination createTemporaryDestination() throws JMSException {
		Session session = borrowSession();
		try {
			if ( session instanceof TopicSession ) {
				TopicSession topicSession = (TopicSession) session;
				return topicSession.createTemporaryTopic();
			}
			else {
				QueueSession queueSession = (QueueSession) session;
				return queueSession.createTemporaryQueue();
			}
		}
		finally {
			returnSession( session );
		}
	}

	public void send( Destination destination, Message message ) throws JMSException {
		log.info("Try to send message: " + message);
		Session session = borrowSession();
		MessageProducer producer = null;
		try {
			producer = getMessageProducer( session, destination );
			if ( producer instanceof TopicPublisher ) {
				((TopicPublisher) producer).publish( message );
				log.info("Sent on topic: " + destination + " message: " + message);
			}
			else {
				((QueueSender) producer).send( message );
				log.info("Sent on queue: " + destination + " message: " + message);
			}
		}
		finally {
			producer.close();
			returnSession( session );
		}
	}

	public Message call( Destination destination, Message message ) throws JMSException {
		Session session = borrowSession();
		try {
			if ( session instanceof TopicSession ) {
				TopicRequestor requestor = getTopicRequestor( (TopicSession) session, (Topic) destination );
				return requestor.request( message );
			}
			else {
				QueueRequestor requestor = getQueueRequestor( (QueueSession) session, (Queue) destination );
				return requestor.request( message );
			}
		}
		finally {
			returnSession( session );
		}
	}

/*
	public Message call( Destination destination, Message message ) throws JMSException {
		Session session = borrowSession();
		try {
			Destination replyTo = getReplyToDestination();
			message.setJMSReplyTo(replyTo);

			MessageProducer producer = getMessageProducer( session, destination );
			MessageConsumer consumer = getMessageConsumer( session, replyTo );

			if ( session instanceof TopicSession ) {
				((TopicPublisher) producer).publish( message );
			}
			else {
				((QueueSender) producer).send( message );
			}
			return consumer.receive();
		}
		finally {
			returnSession( session );
		}
	}
*/

	public Message call( Destination destination, Message message, long timeoutMillis ) throws JMSException {
		Session session = borrowSession();
		MessageProducer producer = null;
		MessageConsumer consumer = null;
		try {
			Destination replyTo = getReplyToDestination();
			message.setJMSReplyTo(replyTo);

			producer = getMessageProducer( session, destination );
			consumer = getMessageConsumer( session, replyTo );

			if ( session instanceof TopicSession ) {
				((TopicPublisher) producer).publish( message );
			}
			else {
				((QueueSender) producer).send( message );
			}
			return consumer.receive(timeoutMillis);
		}
		finally {
			producer.close();
			returnSession( session );
		}
	}

	public Message receive(Destination destination) throws JMSException {
		Session session = borrowSession();
		MessageConsumer consumer = null;
		try {
			consumer = getMessageConsumer( session, destination );
			return consumer.receive();
		}
		finally {
			returnSession( session );
		}
	}

	public Message receive(Destination destination, String selector) throws JMSException {
		Session session = borrowSession();
		MessageConsumer consumer = null;
		try {
			consumer = getMessageConsumer( session, destination, selector );
			return consumer.receive();
		}
		finally {
			returnSession( session );
		}
	}

	public Message receive(Destination destination, long timeoutMillis) throws JMSException {
		Session session = borrowSession();
		MessageConsumer consumer = null;
		try {
			consumer = getMessageConsumer( session, destination );
			return consumer.receive(timeoutMillis);
		}
		finally {
			returnSession( session );
		}
	}

	public Message receive(Destination destination, String selector, long timeoutMillis) throws JMSException {
		Session session = borrowSession();
		MessageConsumer consumer = null;
		try {
			consumer = getMessageConsumer( session, destination, selector );
			return consumer.receive(timeoutMillis);
		}
		finally {
			returnSession( session );
		}
	}

	public Message receiveNoWait(Destination destination) throws JMSException {
		Session session = borrowSession();
		MessageConsumer consumer = null;
		try {
			consumer = getMessageConsumer( session, destination );
			return consumer.receiveNoWait();
		}
		finally {
			returnSession( session );
		}
	}

	public Message receiveNoWait(Destination destination, String selector) throws JMSException {
		Session session = borrowSession();
		MessageConsumer consumer = null;
		try {
			consumer = getMessageConsumer( session, destination, selector );
			return consumer.receiveNoWait();
		}
		finally {
			returnSession( session );
		}
	}

	public MessageConsumer createConsumer(Destination destination) throws JMSException {
		Session session = borrowSession();
		try {
			return createMessageConsumer( session, destination );
		}
		finally {
			returnSession( session );
		}
	}

	public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException {
		Session session = borrowSession();
		try {
			return createMessageConsumer( session, destination, selector );
		}
		finally {
			returnSession( session );
		}
	}

	public void run() {
		// don't return sessions which throw an exception
		try {
			Session session = borrowSession();
			session.run();
			returnSession( session );
		}
		catch (JMSException e) {
			// ### ignore
		}
	}

	public ConnectionConsumer createConnectionConsumer(Destination destination, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
		return createConnectionConsumer(destination, null, sessionPool, maxMessages);
	}

	public ConnectionConsumer createConnectionConsumer(Destination destination, String selector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
		Connection connection = getConnection();
		if ( connection instanceof TopicConnection ) {
			TopicConnection topicConnection = (TopicConnection) connection;
			if ( isDurable() ) {
				return topicConnection.createDurableConnectionConsumer( (Topic) destination, getDurableName(), selector, sessionPool, maxMessages );
			}
			else {
				return topicConnection.createConnectionConsumer( (Topic) destination, selector, sessionPool, maxMessages );
			}
		}
		else {
			QueueConnection queueConnection = (QueueConnection) connection;
			return queueConnection.createConnectionConsumer( (Queue) destination, selector, sessionPool, maxMessages );
		}
	}

	public abstract Connection getConnection() throws JMSException;

	// Listener API
	//-------------------------------------------------------------------------

	public void addListener(Destination destination, MessageListener listener) throws JMSException {
		if ( listener instanceof MessengerListener ) {
			MessengerListener messengerListener = (MessengerListener) listener;
			messengerListener.setMessenger( this );
		}
		Session session = borrowListenerSession();
		try {
			MessageConsumer consumer = createMessageConsumer( session, destination );
			consumer.setMessageListener( listener );

			ListenerKey key = new ListenerKey( destination, listener );
			listeners.put( key, consumer );
		}
		finally {
			returnListenerSession( session );
		}
	}

	public void addListener(Destination destination, String selector, MessageListener listener) throws JMSException {
		if ( listener instanceof MessengerListener ) {
			MessengerListener messengerListener = (MessengerListener) listener;
			messengerListener.setMessenger( this );
		}
		Session session = borrowListenerSession();
		try {
			MessageConsumer consumer = createMessageConsumer( session, destination, selector );
			consumer.setMessageListener( listener );

			ListenerKey key = new ListenerKey( destination, listener, selector );
			listeners.put( key, consumer );
		}
		finally {
			returnListenerSession( session );
		}
	}


	public void removeListener(Destination destination, MessageListener listener ) throws JMSException {
		ListenerKey key = new ListenerKey( destination, listener );
		MessageConsumer consumer = (MessageConsumer) listeners.remove( key );
		if ( consumer == null ) {
			throw new JMSException( "The given listener object has not been added for the given destination" );
		}
		consumer.close();
	}

	public void removeListener(Destination destination, String selector, MessageListener listener ) throws JMSException {
		ListenerKey key = new ListenerKey( destination, listener, selector );
		MessageConsumer consumer = (MessageConsumer) listeners.remove( key );
		if ( consumer == null ) {
			throw new JMSException( "The given listener object has not been added for the given destination and selector" );
		}
		consumer.close();
	}


	// Message factory methods
	//-------------------------------------------------------------------------

	public BytesMessage createBytesMessage() throws JMSException {
		Session session = borrowSession();
		try {
			return session.createBytesMessage();
		}
		finally {
			returnSession( session );
		}
	}

	public MapMessage createMapMessage() throws JMSException {
		Session session = borrowSession();
		try {
			return session.createMapMessage();
		}
		finally {
			returnSession( session );
		}
	}

	public Message createMessage() throws JMSException {
		Session session = borrowSession();
		try {
			return session.createMessage();
		}
		finally {
			returnSession( session );
		}
	}

	public ObjectMessage createObjectMessage() throws JMSException {
		Session session = borrowSession();
		try {
			return session.createObjectMessage();
		}
		finally {
			returnSession( session );
		}
	}

	public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
		Session session = borrowSession();
		try {
			return session.createObjectMessage(object);
		}
		finally {
			returnSession( session );
		}
	}

	public StreamMessage createStreamMessage() throws JMSException {
		Session session = borrowSession();
		try {
			return session.createStreamMessage();
		}
		finally {
			returnSession( session );
		}
	}

	public TextMessage createTextMessage() throws JMSException {
		Session session = borrowSession();
		try {
			return session.createTextMessage();
		}
		finally {
			returnSession( session );
		}
	}

	public TextMessage createTextMessage(String text) throws JMSException {
		Session session = borrowSession();
		try {
			return session.createTextMessage(text);
		}
		finally {
			returnSession( session );
		}
	}

	public void commit() throws JMSException {
		Session session = borrowSession();
		try {
			session.commit();
		}
		finally {
			returnSession( session );
		}
	}

	public void rollback() throws JMSException {
		Session session = borrowSession();
		try {
			session.rollback();
		}
		finally {
			returnSession( session );
		}
	}

	public void close() throws JMSException {
		getConnection().close();
	}

	// Properties
	//-------------------------------------------------------------------------

	/** Gets the name that this Messenger is called in a MessengerManager */
	public String getName() {
		return name;
	}

	/** Sets the name that this Messenger is called in a MessengerManager */
	public void setName(String name) {
		this.name = name;
	}

	/** Gets whether topic subscribers are durable or not */
	public boolean isDurable() {
		return noLocal;
	}

	/** Sets whether topic subscribers are durable or not */
	public void setDurable(boolean durable) {
		this.durable = durable;
	}

	/** Returns the durable name used for durable topic based subscriptions */
	public String getDurableName() {
		return durableName;
	}

	/** Sets the durable name used for durable topic based subscriptions */
	public void setDurableName(String durableName) {
		this.durableName = durableName;
	}

	/** Gets whether local messages are ignored when topic based subscription is used
	 * with a message selector */
	public boolean isNoLocal() {
		return noLocal;
	}

	/** Sets whether local messages are ignored when topic based subscription is used
	 * with a message selector */
	public void setNoLocal(boolean noLocal) {
		this.noLocal = noLocal;
	}

	// Implementation methods
	//-------------------------------------------------------------------------


	/** Borrows a session instance from the pool */
	protected abstract Session borrowSession() throws JMSException;

	/** Returns a session instance back to the pool */
	protected abstract void returnSession(Session session) throws JMSException;

	/** Deletes a session instance */
	protected abstract void deleteSession(Session session) throws JMSException;

	/** Borrows a session instance from the pool */
	protected abstract Session borrowListenerSession() throws JMSException;

	/** Returns a session instance back to the pool */
	protected abstract void returnListenerSession(Session session) throws JMSException;

	/** Returns a message producer for the given session and destination */
	protected MessageProducer getMessageProducer( Session session, Destination destination ) throws JMSException {
		return createMessageProducer( session, destination );
/*
		MessageProducer producer;
		synchronized (producers) {
			producer = (MessageProducer) producers.get( destination );
			if ( producer == null ) {
				producer = createMessageProducer( session, destination );
				producers.put( destination, producer );
			}
		}
		return producer;
*/
	}

	/** Returns a newly created message producer for the given session and destination */
	protected MessageProducer createMessageProducer( Session session, Destination destination ) throws JMSException {
		if ( session instanceof TopicSession ) {
			TopicSession topicSession = (TopicSession) session;
			return topicSession.createPublisher( (Topic) destination );
		}
		else {
			QueueSession queueSession = (QueueSession) session;
			return queueSession.createSender( (Queue) destination );
		}
	}

	/** Returns a MessageConsumer for the given session and destination */
	protected MessageConsumer getMessageConsumer( Session session, Destination destination ) throws JMSException {
		return createMessageConsumer( session, destination );
/*
		MessageConsumer consumer;
		synchronized (consumers) {
			consumer = (MessageConsumer) consumers.get( destination );
			if ( consumer == null ) {
				consumer = createMessageConsumer( session, destination );
				consumers.put( destination, consumer );
			}
		}
		return consumer;
*/
	}

	/** Returns a MessageConsumer for the given session, destination and selector */
	protected MessageConsumer getMessageConsumer( Session session, Destination destination, String selector ) throws JMSException {
		return createMessageConsumer( session, destination, selector );
/*
		MessageConsumer consumer;
		synchronized (consumers) {
			consumer = (MessageConsumer) consumers.get( destination );
			if ( consumer == null ) {
				consumer = createMessageConsumer( session, destination, selector );
				consumers.put( destination, consumer );
			}
		}
		return consumer;
*/
	}

	/** Returns a new MessageConsumer for the given session and destination */
	protected MessageConsumer createMessageConsumer( Session session, Destination destination ) throws JMSException {
		if ( session instanceof TopicSession ) {
			TopicSession topicSession = (TopicSession) session;
			if ( isDurable() ) {
				return topicSession.createDurableSubscriber(
					(Topic) destination,
					getDurableName()
				);
			}
			else {
				return topicSession.createSubscriber(
					(Topic) destination
				);
			}
		}
		else {
			QueueSession queueSession = (QueueSession) session;
			return queueSession.createReceiver( (Queue) destination );
		}
	}

	/** Returns a new MessageConsumer for the given session, destination and selector */
	protected MessageConsumer createMessageConsumer( Session session, Destination destination, String selector ) throws JMSException {
		if ( session instanceof TopicSession ) {
			TopicSession topicSession = (TopicSession) session;
			if ( isDurable() ) {
				return topicSession.createDurableSubscriber(
					(Topic) destination,
					getDurableName(),
					selector,
					isNoLocal()
				);
			}
			else {
				return topicSession.createSubscriber(
					(Topic) destination,
					selector,
					isNoLocal()
				);
			}
		}
		else {
			QueueSession queueSession = (QueueSession) session;
			return queueSession.createReceiver(
				(Queue) destination,
				selector
			);
		}
	}

	protected Queue getQueue(QueueSession session, String subject) throws JMSException {
		// XXXX: might want to cache
		return session.createQueue( subject );
	}

	protected Topic getTopic(TopicSession session, String subject) throws JMSException {
		// XXXX: might want to cache
		return session.createTopic( subject );
	}

	protected Destination getReplyToDestination() throws JMSException {
		if ( replyToDestination == null ) {
			replyToDestination = createTemporaryDestination();
		}
		return replyToDestination;
	}

	protected TopicRequestor getTopicRequestor( TopicSession session, Topic destination ) throws JMSException {
		if ( CACHE_REQUESTOR ) {
			Map requestors = (Map) requestorsMap.get();
			TopicRequestor requestor = (TopicRequestor) requestors.get( destination );
			if ( requestor == null ) {
				requestor = new TopicRequestor( session, destination );
				requestors.put( destination, requestor );
			}
			return requestor;
		}
		else {
			return new TopicRequestor( session, destination );
		}
	}

	protected QueueRequestor getQueueRequestor( QueueSession session, Queue destination ) throws JMSException {
		if ( CACHE_REQUESTOR ) {
			Map requestors = (Map) requestorsMap.get();
			QueueRequestor requestor = (QueueRequestor) requestors.get( destination );
			if ( requestor == null ) {
				requestor = new QueueRequestor( session, destination );
				requestors.put( destination, requestor );
			}
			return requestor;
		}
		else {
			return new QueueRequestor( session, destination );
		}
	}

}

