If someone would have posted something like this, I would have saved weeks of
work.
1. This is a fully-functional quote publisher (with quote objects, db
stuff, and a few other things
removed).
2. It has an embedded broker.
3. When quotes come in, it will publish quotes only if there is a
subscriber
for a quote.
4. It will remove publishers when there are no longer clients subscribed to
a topic.
5. It sets memory and prefetch limits.
6. It evicts messages for slow consumers.
7. It includes a temporary topic request for the most recent quote, and
replies to only the single requesting consumer.
Any recommendations on how to improve would be appreciated.
enjoy.
package server;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.MulticastSocket;
import java.net.UnknownHostException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Topic;
import messages.BarsRequestMessage;
import messages.QuoteRequestMessage;
import messages.TicksRequestMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.ActiveMQTopicPublisher;
import org.apache.activemq.ActiveMQTopicSession;
import org.apache.activemq.ActiveMQTopicSubscriber;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
import
org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
import
org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.memory.CacheEntryList;
import org.apache.activemq.memory.CacheEvictionUsageListener;
import org.apache.activemq.memory.UsageManager;
import org.apache.log4j.Logger;
public class QuoteProducer implements MessageListener
{
protected static final DateFormat dateTimeFormat = new
SimpleDateFormat("MM/dd/yyyy HH:mm:ss");
protected static final String tcpURL = "tcp://localhost:3000";
protected static final String vmURL = "vm://localhost:4000";
static Logger logger = Logger.getLogger(QuoteProducer.class);
protected ActiveMQConnection connection = null;
protected ActiveMQTopicSession session = null;
protected HashMap<String, ActiveMQTopicPublisher> topicMap;
protected BrokerService broker;
protected InetAddress multiCastReceiveAddress = null;
protected int multiCastReceivePort;
protected MulticastSocket multiCastReceiveSocket = null;
protected QuoteListener feedThread;
public QuoteProducer()
{
try
{
//PropertyConfigurator.configure("log4j.properties");
topicMap = new HashMap<String,
ActiveMQTopicPublisher>();
broker = createBroker();
connection = createConnection();
session = createSession(connection);
createTopicsListener();
createTemporaryTopicListener();
feedThread = new QuoteListener("238.0.0.9", 12344);
}
catch (Exception e)
{
logger.error(e);
e.printStackTrace();
}
}
BrokerService createBroker()
{
BrokerService b = null;
try
{
b = new BrokerService();
PolicyEntry policy = new PolicyEntry();
ConstantPendingMessageLimitStrategy limitStrategy = new
ConstantPendingMessageLimitStrategy();
limitStrategy.setLimit(1000);
policy.setPendingMessageLimitStrategy(limitStrategy);
policy.setMessageEvictionStrategy(new
OldestMessageEvictionStrategy());
policy.setSendAdvisoryIfNoConsumers(true);
PolicyMap pMap = new PolicyMap();
pMap.setDefaultEntry(policy);
b .setDestinationPolicy(pMap);
b.setPersistent(false);
b .setUseJmx(false);
UsageManager um = broker.getMemoryManager();
um.setLimit(1024 * 1024 * 200);
CacheEvictionUsageListener ceul = new
CacheEvictionUsageListener(um, 90,
80, broker.getTaskRunnerFactory());
CacheEntryList cel = new CacheEntryList();
ceul.add(cel.createFIFOCacheEvictor());
b.getMemoryManager().addUsageListener(ceul);
b.setDeleteAllMessagesOnStartup(true);
b.addConnector(tcpURL);
b.addConnector(vmURL);
b.start();
}
catch (Exception e)
{
logger.error(e);
e.printStackTrace();
}
return b;
}
protected void publishToTopic(String topic, ActiveMQObjectMessage
message)
{
try
{
ActiveMQTopicPublisher publisher = topicMap.get(topic);
if (publisher == null)
return;
publisher.publish(message);
}
catch (JMSException e)
{
logger.error(e);
e.printStackTrace();
}
}
protected boolean subscribersForTopic(String topic)
{
return (topicMap.containsKey(topic));
}
protected void addSubscription(String topic)
{
if (topic.startsWith("TEMP") || topic.startsWith("ID")||
subscribersForTopic(topic))
return;
synchronized (topicMap)
{
ActiveMQTopicPublisher publisher = null;
try
{
publisher = createPublisher(session, topic);
topicMap.put(topic, publisher);
System.out.println("added topic: " + topic);
}
catch (JMSException e)
{
logger.error(e.getStackTrace());
}
}
}
protected void removeSubscription(String topic)
{
synchronized (topicMap)
{
if (!subscribersForTopic(topic))
return;
ActiveMQTopicPublisher publisher =
topicMap.remove(topic);
if (publisher != null)
{
try
{
publisher.close();
System.out.println("removed topic: " +
topic);
}
catch (JMSException e)
{
logger.error(e.getStackTrace());
}
}
}
}
protected ActiveMQConnection createConnection() throws JMSException,
Exception
{
ActiveMQConnection c = null;
try
{
ActiveMQPrefetchPolicy prefetchPolicy = new
ActiveMQPrefetchPolicy();
prefetchPolicy.setMaximumPendingMessageLimit(1);
prefetchPolicy.setTopicPrefetch(1);
prefetchPolicy.setMaximumPendingMessageLimit(10);
ActiveMQConnection con = null;
ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory("Data Server",
AuthenticatingBroker.serverPassword,
vmURL);
connectionFactory.setAlwaysSessionAsync(true);
connectionFactory.setUseAsyncSend(true);
connectionFactory.setOptimizeAcknowledge(true);
connectionFactory.setDisableTimeStampsByDefault(true);
connectionFactory.setCopyMessageOnSend(false);
connectionFactory.setUseCompression(true);
connectionFactory.setPrefetchPolicy(prefetchPolicy);
con = (ActiveMQConnection)
connectionFactory.createConnection();
con.start();
return con;
}
catch (Exception e)
{
logger.error(e);
e.printStackTrace();
}
return c;
}
protected ActiveMQTopicSession createSession(ActiveMQConnection
connection)
throws Exception
{
ActiveMQTopicSession ses = (ActiveMQTopicSession)
connection.createTopicSession(false,
ActiveMQSession.AUTO_ACKNOWLEDGE);
return ses;
}
protected ActiveMQTopicPublisher createPublisher(ActiveMQTopicSession
session, String msgTopic) throws JMSException
{
ActiveMQTopic destination = (ActiveMQTopic)
session.createTopic(msgTopic);
ActiveMQTopicPublisher publisher = (ActiveMQTopicPublisher)
session.createPublisher(destination);
publisher.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
publisher.setDisableMessageID(true);
publisher.setDisableMessageTimestamp(true);
return publisher;
}
protected void createTemporaryTopicListener()
{
try
{
String tempTopic = "TEMP MESSAGE";
ActiveMQTopic topic = (ActiveMQTopic)
session.createTopic(tempTopic);
ActiveMQTopicSubscriber tempSubscriber =
(ActiveMQTopicSubscriber)
session
.createSubscriber(topic);
tempSubscriber.setMessageListener(this);
}
catch (JMSException e)
{
logger.error(e);
e.printStackTrace();
}
}
protected void createTopicsListener()
{
try
{
String msgTopic = ">"; // listen for advisories on all
topics
ActiveMQTopic allTopics = (ActiveMQTopic)
session.createTopic(msgTopic);
ActiveMQTopic noConsumerTopic =
AdvisorySupport.getNoTopicConsumersAdvisoryTopic(allTopics);
ActiveMQTopicSubscriber noConsumerSubscriber =
(ActiveMQTopicSubscriber)
session
.createSubscriber(noConsumerTopic);
noConsumerSubscriber.setMessageListener(this);
ActiveMQTopic consumerTopic =
AdvisorySupport.getConsumerAdvisoryTopic(allTopics);
ActiveMQTopicSubscriber consumerSubscriber =
(ActiveMQTopicSubscriber)
session
.createSubscriber(consumerTopic);
consumerSubscriber.setMessageListener(this);
/*
ActiveMQTopic producerTopic =
AdvisorySupport.getProducerAdvisoryTopic(allTopics);
ActiveMQTopicSubscriber producerSubscriber =
(ActiveMQTopicSubscriber)
session
.createSubscriber(producerTopic);
producerSubscriber.setMessageListener(this);
ActiveMQTopic connectionTopic =
AdvisorySupport.getConnectionAdvisoryTopic();
ActiveMQTopicSubscriber connectionSubscriber =
(ActiveMQTopicSubscriber)
session
.createSubscriber(connectionTopic);
connectionSubscriber.setMessageListener(this);
ActiveMQTopic destinationTopic =
AdvisorySupport.getDestinationAdvisoryTopic(allTopics);
ActiveMQTopicSubscriber destinationSubscriber =
(ActiveMQTopicSubscriber)
session
.createSubscriber(destinationTopic);
destinationSubscriber.setMessageListener(this);
*/
}
catch (JMSException e)
{
logger.error(e);
e.printStackTrace();
}
}
public static boolean isNoConsumerAdvisoryTopic(ActiveMQDestination
destination)
{
if (destination.isComposite())
{
ActiveMQDestination[] compositeDestinations =
destination.getCompositeDestinations();
for (int i = 0; i < compositeDestinations.length; i++)
{
if
(isNoConsumerAdvisoryTopic(compositeDestinations[i]))
{
return true;
}
}
return false;
}
else
{
return destination.isTopic()
&&
destination.getPhysicalName().startsWith(AdvisorySupport.NO_TOPIC_CONSUMERS_TOPIC_PREFIX);
}
}
public static ActiveMQTopic
getNoConsumerAdvisoryTopic(ActiveMQDestination
destination)
{
return new
ActiveMQTopic(AdvisorySupport.NO_TOPIC_CONSUMERS_TOPIC_PREFIX +
destination.getPhysicalName());
}
public static String getNoConsumerAdvisoryTopic(String topic)
{
String ret =
topic.substring(AdvisorySupport.NO_TOPIC_CONSUMERS_TOPIC_PREFIX.length());
return ret;
}
public static String getConsumerAdvisoryTopic(String topic)
{
String ret =
topic.substring(AdvisorySupport.TOPIC_CONSUMER_ADVISORY_TOPIC_PREFIX.length());
return ret;
}
public void onMessage(Message message)
{
ActiveMQMessage activeMessage = (ActiveMQMessage) message;
ActiveMQTopic destination = (ActiveMQTopic)
activeMessage.getDestination();
System.out.println(destination);
if (AdvisorySupport.isAdvisoryTopic(destination))
{
if
(AdvisorySupport.isConsumerAdvisoryTopic(destination))
{
Object command =
activeMessage.getDataStructure();
if (command != null)
{
if (command instanceof ConsumerInfo)
{
String topic =
getConsumerAdvisoryTopic(destination.getPhysicalName());
addSubscription(topic);
}
}
}
else if (isNoConsumerAdvisoryTopic(destination))
{
String topic =
getNoConsumerAdvisoryTopic(destination.getPhysicalName());
removeSubscription(topic);
}
}
else if (activeMessage instanceof ActiveMQObjectMessage)
{ //for out temorary request topics
try
{
if (requestMessage.getObject() instanceof
QuoteRequestMessage)
{
new
QuoteMessageRequester(requestMessage);
}
}
catch (JMSException e)
{
logger.error(e);
e.printStackTrace();
return;
}
}
}
class QuoteListener extends Thread
{
InetAddress multiCastReceiveAddress;
int multiCastReceivePort;
MulticastSocket multiCastReceiveSocket;
DatagramPacket packet;
public QuoteListener(String receiveAddress, int receivePort)
{
try
{
this.multiCastReceiveAddress =
InetAddress.getByName(receiveAddress);
}
catch (UnknownHostException e)
{
logger.error(e);
e.printStackTrace();
}
this.multiCastReceivePort = receivePort;
try
{
multiCastReceiveSocket = new
MulticastSocket(multiCastReceivePort);
multiCastReceiveSocket.setTimeToLive(5);
multiCastReceiveSocket.joinGroup(multiCastReceiveAddress);
}
catch (Exception e)
{
logger.error(e);
e.printStackTrace();
}
byte[] message = new byte[1000];
packet = new DatagramPacket(message, message.length);
start();
}
public void run()
{
try
{
while (true)
{
multiCastReceiveSocket.receive(packet);
//long start =
System.currentTimeMillis();
processPacket();
//long end = System.currentTimeMillis()
- start;
//System.out.println("End of wait for "
+ end + " millis");
}
}
catch (Exception e)
{
e.printStackTrace();
}
}
public void processPacket()
{
try
{
String topic = "TEST.QUOTE";
if (!subscribersForTopic(topic))
return;
String pretendObject= new String(topic + " " +
"stuff");
ActiveMQObjectMessage objectMessage =
(ActiveMQObjectMessage)
session.createObjectMessage();
objectMessage.setObject(pretendObject);
if (topic.length() > 0 && objectMessage != null)
publishToTopic(topic,
objectMessage);
}
catch (Exception e)
{
System.out.println(e.getStackTrace());
}
}
}
class QuoteMessageRequester extends Thread
{
ActiveMQObjectMessage requestMessage;
QuoteMessageRequester(ActiveMQObjectMessage requestMessage)
{
this.requestMessage = requestMessage;
start();
}
public void run()
{
try
{
String pretendObject= new String("Quote " +
"stuff");
ActiveMQObjectMessage objectMessage =
(ActiveMQObjectMessage)
session.createObjectMessage();
objectMessage.setObject(pretendObject);
sendReply(requestMessage, pretendObject);
}
catch (Exception e)
{
logger.error(e);
e.printStackTrace();
return;
}
}
}
public synchronized void sendReply(Message in, Serializable out)
{
try
{
ActiveMQConnection con = createConnection();
ActiveMQTopicSession ses = createSession(con);
ActiveMQObjectMessage replyMessage =
(ActiveMQObjectMessage)
ses.createObjectMessage(out);
replyMessage.setJMSCorrelationID(in.getJMSMessageID());
Destination replyDestination = in.getJMSReplyTo();
ActiveMQTopicPublisher pub = (ActiveMQTopicPublisher)
ses.createPublisher((Topic) replyDestination);
pub.publish(replyMessage);
pub.close();
ses.close();
}
catch (Exception e)
{
logger.error(e);
e.printStackTrace();
return;
}
}
public static void main(String[] args)
{
BufferedReader br = new BufferedReader(new
InputStreamReader(System.in));
String str;
System.out.println("Enter 'quit' to exit\n");
do
{
str = "";
try
{
str = br.readLine().toUpperCase();
}
catch (IOException e)
{
e.printStackTrace();
}
}
while (!str.equals("quit"));
System.exit(0);
}
}
--
View this message in context:
http://www.nabble.com/included%3A-fully-functional-%28almost%29-quote-producer-tf2482597.html#a6922824
Sent from the ActiveMQ - User mailing list archive at Nabble.com.