If someone would have posted something like this, I would have saved weeks of
work.

1.      This is a fully-functional quote publisher  (with  quote objects, db
stuff, and a few other things  
removed).
2.      It has an embedded broker.
3.      When quotes come in, it will publish quotes only if there is a 
subscriber
for a quote.
4.      It will remove publishers when there are no longer clients subscribed to
a topic.
5.      It sets memory and prefetch limits.
6.      It evicts messages for slow consumers.
7.      It includes a temporary topic request for the most recent quote, and
replies to only the single requesting consumer.


Any recommendations on how to improve would be appreciated.

enjoy.



package server;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.MulticastSocket;
import java.net.UnknownHostException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Topic;
import messages.BarsRequestMessage;
import messages.QuoteRequestMessage;
import messages.TicksRequestMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.ActiveMQTopicPublisher;
import org.apache.activemq.ActiveMQTopicSession;
import org.apache.activemq.ActiveMQTopicSubscriber;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
import
org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
import
org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.memory.CacheEntryList;
import org.apache.activemq.memory.CacheEvictionUsageListener;
import org.apache.activemq.memory.UsageManager;
import org.apache.log4j.Logger;

public class QuoteProducer implements MessageListener
{
        protected static final DateFormat dateTimeFormat = new
SimpleDateFormat("MM/dd/yyyy HH:mm:ss");
        protected static final String tcpURL = "tcp://localhost:3000";
        protected static final String vmURL = "vm://localhost:4000";
        static Logger logger = Logger.getLogger(QuoteProducer.class);
        protected ActiveMQConnection connection = null;
        protected ActiveMQTopicSession session = null;
        protected HashMap<String, ActiveMQTopicPublisher> topicMap;
        protected BrokerService broker;
        protected InetAddress multiCastReceiveAddress = null;
        protected int multiCastReceivePort;
        protected MulticastSocket multiCastReceiveSocket = null;
        protected QuoteListener feedThread;
        
        public QuoteProducer()
        {
                try
                {
                        //PropertyConfigurator.configure("log4j.properties");
                        
                        topicMap = new HashMap<String, 
ActiveMQTopicPublisher>();
                        broker = createBroker();
                        connection = createConnection();
                        session = createSession(connection);
                        createTopicsListener();
                        createTemporaryTopicListener();
                        feedThread = new QuoteListener("238.0.0.9", 12344);
                }
                catch (Exception e)
                {
                        logger.error(e);
                        e.printStackTrace();
                }
        }
        
        BrokerService createBroker()
        {
                BrokerService b = null;
                try
                {
                        b = new BrokerService();
                        PolicyEntry policy = new PolicyEntry();
                        ConstantPendingMessageLimitStrategy limitStrategy = new
ConstantPendingMessageLimitStrategy();
                        limitStrategy.setLimit(1000);
                        policy.setPendingMessageLimitStrategy(limitStrategy);
                        policy.setMessageEvictionStrategy(new 
OldestMessageEvictionStrategy());
                policy.setSendAdvisoryIfNoConsumers(true);

                        PolicyMap pMap = new PolicyMap();
                        pMap.setDefaultEntry(policy);
                        
                        b .setDestinationPolicy(pMap);
                        b.setPersistent(false);
                        b .setUseJmx(false);
                        
                        UsageManager um = broker.getMemoryManager();
                        um.setLimit(1024 * 1024 * 200);
                        CacheEvictionUsageListener ceul = new 
CacheEvictionUsageListener(um, 90,
80, broker.getTaskRunnerFactory());
                        
                        CacheEntryList cel = new CacheEntryList();
                        ceul.add(cel.createFIFOCacheEvictor());
                        b.getMemoryManager().addUsageListener(ceul);
                        b.setDeleteAllMessagesOnStartup(true);
                        b.addConnector(tcpURL);
                        b.addConnector(vmURL);
                        b.start();
                }
                catch (Exception e)
                {
                        logger.error(e);
                        e.printStackTrace();
                }
                
                return b;
        }
        
        protected void publishToTopic(String topic, ActiveMQObjectMessage 
message)
        {
                try
                {
                        ActiveMQTopicPublisher publisher = topicMap.get(topic);
                        if (publisher == null)
                                return;
                        publisher.publish(message);
                }
                catch (JMSException e)
                {
                        logger.error(e);
                        e.printStackTrace();
                }
        }

        protected boolean subscribersForTopic(String topic)
        {
                return (topicMap.containsKey(topic));
        }

        protected void addSubscription(String topic)
        {
                if (topic.startsWith("TEMP") || topic.startsWith("ID")||
subscribersForTopic(topic))
                        return;
                
                synchronized (topicMap)
                {
                        ActiveMQTopicPublisher publisher = null;
                        try
                        {
                                publisher = createPublisher(session, topic);
                                topicMap.put(topic, publisher);
                                System.out.println("added topic: " + topic);
                        }
                        catch (JMSException e)
                        {
                                logger.error(e.getStackTrace());
                        }
                }
        }

        protected void removeSubscription(String topic)
        {
                synchronized (topicMap)
                {
                        if (!subscribersForTopic(topic))
                                return;

                        ActiveMQTopicPublisher publisher = 
topicMap.remove(topic);
                        if (publisher != null)
                        {
                                try
                                {
                                        publisher.close();
                                        System.out.println("removed topic: " + 
topic);
                                }
                                catch (JMSException e)
                                {
                                        logger.error(e.getStackTrace());
                                }
                        }
                }
        }
        
        protected ActiveMQConnection createConnection() throws JMSException,
Exception
        {
                ActiveMQConnection c = null;
                try
                {
                        ActiveMQPrefetchPolicy prefetchPolicy = new 
ActiveMQPrefetchPolicy();
                        prefetchPolicy.setMaximumPendingMessageLimit(1);
                        prefetchPolicy.setTopicPrefetch(1);
                        prefetchPolicy.setMaximumPendingMessageLimit(10);

                        ActiveMQConnection con = null;
                        ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory("Data Server",
                                        AuthenticatingBroker.serverPassword, 
vmURL);
                        connectionFactory.setAlwaysSessionAsync(true);
                        connectionFactory.setUseAsyncSend(true);
                        connectionFactory.setOptimizeAcknowledge(true);
                        connectionFactory.setDisableTimeStampsByDefault(true);
                        connectionFactory.setCopyMessageOnSend(false);
                        connectionFactory.setUseCompression(true);
                        connectionFactory.setPrefetchPolicy(prefetchPolicy);
                        con = (ActiveMQConnection) 
connectionFactory.createConnection();

                        con.start();
                        return con;
                }
                catch (Exception e)
                {
                        logger.error(e);
                        e.printStackTrace();
                }
                return c;
        }
        
        protected ActiveMQTopicSession createSession(ActiveMQConnection 
connection)
throws Exception
        {
                ActiveMQTopicSession ses = (ActiveMQTopicSession)
connection.createTopicSession(false,
                                ActiveMQSession.AUTO_ACKNOWLEDGE);
                return ses;
        }
        
        protected ActiveMQTopicPublisher createPublisher(ActiveMQTopicSession
session, String msgTopic) throws JMSException
        {
                ActiveMQTopic destination = (ActiveMQTopic) 
session.createTopic(msgTopic);
                ActiveMQTopicPublisher publisher = (ActiveMQTopicPublisher)
session.createPublisher(destination);
                publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                publisher.setDisableMessageID(true);
                publisher.setDisableMessageTimestamp(true);
                return publisher;
        }
        
        protected void createTemporaryTopicListener()
        {
                try
                {
                        String tempTopic = "TEMP MESSAGE";
                        ActiveMQTopic topic = (ActiveMQTopic) 
session.createTopic(tempTopic);

                        ActiveMQTopicSubscriber tempSubscriber = 
(ActiveMQTopicSubscriber)
session
                                .createSubscriber(topic);
                        tempSubscriber.setMessageListener(this);
                        
                }
                catch (JMSException e)
                {
                        logger.error(e);
                        e.printStackTrace();
                }
        }
        protected void createTopicsListener()
        {
                try
                {
                        String msgTopic = ">"; // listen for advisories on all 
topics
                        ActiveMQTopic allTopics = (ActiveMQTopic) 
session.createTopic(msgTopic);
                        
                        ActiveMQTopic noConsumerTopic =
AdvisorySupport.getNoTopicConsumersAdvisoryTopic(allTopics);
                        ActiveMQTopicSubscriber noConsumerSubscriber = 
(ActiveMQTopicSubscriber)
session
                                        .createSubscriber(noConsumerTopic);
                        noConsumerSubscriber.setMessageListener(this);

                        ActiveMQTopic consumerTopic =
AdvisorySupport.getConsumerAdvisoryTopic(allTopics);
                        ActiveMQTopicSubscriber consumerSubscriber = 
(ActiveMQTopicSubscriber)
session
                                        .createSubscriber(consumerTopic);
                        consumerSubscriber.setMessageListener(this);

                        /*                      
                        ActiveMQTopic producerTopic =
AdvisorySupport.getProducerAdvisoryTopic(allTopics);
                        ActiveMQTopicSubscriber producerSubscriber = 
(ActiveMQTopicSubscriber)
session
                                        .createSubscriber(producerTopic);
                        producerSubscriber.setMessageListener(this);

                        
                        ActiveMQTopic connectionTopic =
AdvisorySupport.getConnectionAdvisoryTopic();
                        ActiveMQTopicSubscriber connectionSubscriber = 
(ActiveMQTopicSubscriber)
session
                                        .createSubscriber(connectionTopic);
                        connectionSubscriber.setMessageListener(this);
                        ActiveMQTopic destinationTopic =
AdvisorySupport.getDestinationAdvisoryTopic(allTopics);
                        ActiveMQTopicSubscriber destinationSubscriber = 
(ActiveMQTopicSubscriber)
session
                                        .createSubscriber(destinationTopic);
                        destinationSubscriber.setMessageListener(this);
                        */
                }
                catch (JMSException e)
                {
                        logger.error(e);
                        e.printStackTrace();
                }
        }
        
        public static boolean isNoConsumerAdvisoryTopic(ActiveMQDestination
destination)
        {
                if (destination.isComposite())
                {
                        ActiveMQDestination[] compositeDestinations =
destination.getCompositeDestinations();
                        for (int i = 0; i < compositeDestinations.length; i++)
                        {
                                if 
(isNoConsumerAdvisoryTopic(compositeDestinations[i]))
                                {
                                        return true;
                                }
                        }
                        return false;
                }
                else
                {
                        return destination.isTopic()
                                        &&
destination.getPhysicalName().startsWith(AdvisorySupport.NO_TOPIC_CONSUMERS_TOPIC_PREFIX);
                }
        }
        
        public static ActiveMQTopic 
getNoConsumerAdvisoryTopic(ActiveMQDestination
destination)
        {
                return new 
ActiveMQTopic(AdvisorySupport.NO_TOPIC_CONSUMERS_TOPIC_PREFIX +
destination.getPhysicalName());
        }
        
        public static String getNoConsumerAdvisoryTopic(String topic)
        {
                String ret =
topic.substring(AdvisorySupport.NO_TOPIC_CONSUMERS_TOPIC_PREFIX.length());
                return ret;
        }
        public static String getConsumerAdvisoryTopic(String topic)
        {
                String ret =
topic.substring(AdvisorySupport.TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX.length());
                return ret;
        }
        
        public void onMessage(Message message)
        {
                ActiveMQMessage activeMessage = (ActiveMQMessage) message;
                ActiveMQTopic destination = (ActiveMQTopic)
activeMessage.getDestination();
                System.out.println(destination);
                
                if (AdvisorySupport.isAdvisoryTopic(destination))
                {
                        if 
(AdvisorySupport.isConsumerAdvisoryTopic(destination))
                        {
                                Object command = 
activeMessage.getDataStructure();
                                if (command != null)
                                {
                                        if (command instanceof ConsumerInfo)
                                        {
                                                String topic =
getConsumerAdvisoryTopic(destination.getPhysicalName());
                                                addSubscription(topic);
                                        }
                                }
                        }
                        else if (isNoConsumerAdvisoryTopic(destination))
                        {
                                String topic =
getNoConsumerAdvisoryTopic(destination.getPhysicalName());
                                removeSubscription(topic);
                        }
                }
                else if (activeMessage instanceof ActiveMQObjectMessage)
                { //for out temorary request topics
                        try
                        {
                                if (requestMessage.getObject() instanceof 
QuoteRequestMessage)
                                {
                                        new 
QuoteMessageRequester(requestMessage);
                                }
                        }
                        catch (JMSException e)
                        {
                                logger.error(e);
                                e.printStackTrace();
                                return;
                        }
                }
        }
        
        class QuoteListener extends Thread
        {
                InetAddress multiCastReceiveAddress;
                int multiCastReceivePort;
                MulticastSocket multiCastReceiveSocket;
                DatagramPacket packet;

                public QuoteListener(String receiveAddress, int receivePort)
                {
                        try
                        {
                                this.multiCastReceiveAddress = 
InetAddress.getByName(receiveAddress);
                        }
                        catch (UnknownHostException e)
                        {
                                logger.error(e);
                                e.printStackTrace();
                        }
                        this.multiCastReceivePort = receivePort;
                        try
                        {
                                multiCastReceiveSocket = new 
MulticastSocket(multiCastReceivePort);
                                multiCastReceiveSocket.setTimeToLive(5);
                                
multiCastReceiveSocket.joinGroup(multiCastReceiveAddress);
                        }
                        catch (Exception e)
                        {
                                logger.error(e);
                                e.printStackTrace();
                        }
                        byte[] message = new byte[1000];
                        packet = new DatagramPacket(message, message.length);
                        start();
                }
                
                public void run()
                {
                        try
                        {
                                while (true)
                                {
                                        multiCastReceiveSocket.receive(packet);
                                        //long start = 
System.currentTimeMillis();
                                        processPacket();
                                        //long end = System.currentTimeMillis() 
- start;
                                        //System.out.println("End of wait for " 
+ end + " millis");
                                }
                        }
                        catch (Exception e)
                        {
                                e.printStackTrace();
                        }
                }
                
                public void processPacket()
                {
                        try 
                        {                               
                                String topic = "TEST.QUOTE";
                                if (!subscribersForTopic(topic))
                                        return;

                                String pretendObject= new String(topic + " " + 
"stuff");
                                ActiveMQObjectMessage objectMessage = 
(ActiveMQObjectMessage)
                                                session.createObjectMessage();
                                objectMessage.setObject(pretendObject);
                                if (topic.length() > 0 && objectMessage != null)
                                                        publishToTopic(topic, 
objectMessage);
                        }
                        catch (Exception e)
                        {
                                System.out.println(e.getStackTrace());
                        }
                }
        }

        class QuoteMessageRequester extends Thread
        {
                ActiveMQObjectMessage requestMessage;
                QuoteMessageRequester(ActiveMQObjectMessage requestMessage)
                {
                        this.requestMessage = requestMessage;
                        start();
                }
                public void run()
                {
                        try
                        {
                                String pretendObject= new String("Quote  " + 
"stuff");
                                ActiveMQObjectMessage objectMessage = 
(ActiveMQObjectMessage)
                                                session.createObjectMessage();
                                objectMessage.setObject(pretendObject);
                                
                                sendReply(requestMessage, pretendObject);
                        }
                        catch (Exception e)
                        {
                                logger.error(e);
                                e.printStackTrace();
                                return;
                        }
                }
        }


        public synchronized void sendReply(Message in, Serializable out)
        {
                try
                {
                        ActiveMQConnection con = createConnection();
                        ActiveMQTopicSession ses = createSession(con);
                        ActiveMQObjectMessage replyMessage = 
(ActiveMQObjectMessage)
ses.createObjectMessage(out);
                        replyMessage.setJMSCorrelationID(in.getJMSMessageID());
                        Destination replyDestination = in.getJMSReplyTo();
                        ActiveMQTopicPublisher pub = (ActiveMQTopicPublisher)
ses.createPublisher((Topic) replyDestination);
                        pub.publish(replyMessage);
                        pub.close();
                        ses.close();
                }
                catch (Exception e)
                {
                        logger.error(e);
                        e.printStackTrace();
                        return;
                }
        }
        public static void main(String[] args)
        {
                BufferedReader br = new BufferedReader(new 
InputStreamReader(System.in));
                String str;
                System.out.println("Enter 'quit' to exit\n");
                do
                {
                        str = "";
                        try
                        {
                                str = br.readLine().toUpperCase();
                        }
                        catch (IOException e)
                        {
                                e.printStackTrace();
                        }
                }
                while (!str.equals("quit"));
                System.exit(0);
        }
}

-- 
View this message in context: 
http://www.nabble.com/included%3A-fully-functional-%28almost%29-quote-producer-tf2482597.html#a6922824
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Reply via email to