[
https://issues.apache.org/jira/browse/AMQ-5537?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14294913#comment-14294913
]
Ehud Eshet commented on AMQ-5537:
---------------------------------
I hope I was not too rude by specifying Fix Version as 5.11.
Where is the place or the mechanism to discuss such enhancement requests?
> Network Connector Throughput
> ----------------------------
>
> Key: AMQ-5537
> URL: https://issues.apache.org/jira/browse/AMQ-5537
> Project: ActiveMQ
> Issue Type: Improvement
> Components: Connector
> Affects Versions: 5.x
> Environment: Network of Brokers. Platform agnostic. Local Broker has
> a networkConnector defined to forward all messages to a remote broker.
> Reporter: Ehud Eshet
>
> *Requirement*
> 1. Allow network connector to use transactions when forwarding persistent
> messages.
> 2. Provide the following new network connector properties:
> maxMessagesPerTransaction - when specified and great than 1, use transactions.
> maxTransactionLatencyMillis - commit immediately when time passed since last
> commit is more than specified.
> Let's say both parameters are set as 1000.
> Network connector should commit after every 1000 messages or when more than
> 1000ms passed since last commit (the sooner).
> *Background*
> Persistent messages throughput is significantly slower.
> When using transactions and committing every 1000 messages, throughput on
> local broker with levelDB is about 12,000 messages of 1KB per second.
> Network connector does not use transactions. Thus, its throughput is limited
> to few hundreds messages per second.
> When imitating network connector functionality (receive from local broker and
> send to remote broker) using transactions on both sessions, I managed to have
> a sustained throughput of 10,000 messages/sec stored on local broker plus up
> to 11,000 messages/s forwarded to remote broker (forwarding throughput must
> be higher to allow catch up after reconnect).
> *Sample code*
> {code:title=TransactionalStoreAndForward.java|borderStyle=solid}
> import java.util.Date;
> import javax.jms.*;
> import javax.jms.Connection;
> import javax.jms.Message;
> import org.apache.activemq.*;
> import org.apache.activemq.broker.*;
> public class TransactionalStoreAndForward implements Runnable
> {
> private final String m_queueName;
> private final ActiveMQConnectionFactory m_fromAMQF, m_toAMQF;
>
> private Connection m_fromConn = null, m_toConn = null;
> private Session m_fromSess = null, m_toSess = null;
> private MessageConsumer m_msgConsumer = null;
> private MessageProducer m_msgProducer = null;
>
> private boolean m_cont = true;
>
> public static final int MAX_MESSAGES_PER_TRANSACTION = 500;
> public static final long MAX_TRANSACTION_LATENCY_MILLIS = 5000L;
>
> public TransactionalStoreAndForward(String fromUri, String toUri,
> String queueName)
> {
> m_fromAMQF = new ActiveMQConnectionFactory(fromUri);
> m_toAMQF = new ActiveMQConnectionFactory(toUri);
> m_queueName = queueName;
> }
>
> @Override
> public void run()
> {
> while (m_cont)
> {
> connect();
> process();
> }
> }
>
> private void process()
> {
> long txMessages = 0, totalMessages = 0, lastPrintMessages = 0;
> long startTime = 0L;
> long lastTxTime = startTime, lastPrintTime = startTime;
>
> Message msg = null;
>
> try {
> while (m_cont)
> {
> while ((msg =
> m_msgConsumer.receive(MAX_TRANSACTION_LATENCY_MILLIS)) != null)
> {
> if (startTime == 0) {
> startTime =
> System.currentTimeMillis();
> lastTxTime = startTime;
> lastPrintTime = startTime;
> }
>
> m_msgProducer.send(msg);
> txMessages++;
> totalMessages++;
>
> if (txMessages ==
> MAX_MESSAGES_PER_TRANSACTION ||
>
> System.currentTimeMillis() - lastTxTime > MAX_TRANSACTION_LATENCY_MILLIS)
> {
> m_toSess.commit();
> m_fromSess.commit();
> lastTxTime =
> System.currentTimeMillis();
> txMessages = 0;
> }
>
> if (System.currentTimeMillis() -
> lastPrintTime > 10000L) {
> System.out.println("processed "
> + (totalMessages - lastPrintMessages) + " messages during last 10 seconds.
> Avg. messages/s: " + (totalMessages * 1000L / (System.currentTimeMillis() -
> startTime)) + " at " + new Date());
> lastPrintTime =
> System.currentTimeMillis();
> lastPrintMessages =
> totalMessages;
> }
> }
>
> if (txMessages > 0)
> {
> m_toSess.commit();
> m_fromSess.commit();
> lastTxTime = System.currentTimeMillis();
> txMessages = 0;
> }
> else {
> System.out.println("Idle for more than
> a minute at " + new Date());
> }
> }
> }
> catch(JMSException jmse)
> {
> System.out.println("About to rollback " + txMessages +
> " messages due to: " + jmse.getMessage());
> try {
> m_toSess.rollback();
> m_fromSess.rollback();
> System.out.println("Rollback completed. will
> reconnect soon ...");
> }
> catch (JMSException re)
> {
> System.out.println("Rollback failed !!!");
> re.printStackTrace();
> }
> }
> }
>
> private void connect()
> {
> boolean isNotOK = true;
> String target = null;
> while (isNotOK)
> {
> try {
> if (m_fromConn != null)
> {
> m_fromConn.close();
> m_fromConn = null;
> }
>
> if (m_toConn != null)
> {
> m_toConn.close();
> m_toConn = null;
> }
>
> target = m_fromAMQF.getBrokerURL();
> m_fromConn = m_fromAMQF.createConnection();
> m_fromConn.start();
> m_fromSess = m_fromConn.createSession(true,
> Session.AUTO_ACKNOWLEDGE);
> Destination fromDest =
> m_fromSess.createQueue(m_queueName);
> m_msgConsumer =
> m_fromSess.createConsumer(fromDest);
>
> target = m_toAMQF.getBrokerURL();
> m_toConn = m_toAMQF.createConnection();
> m_toConn.start();
> m_toSess = m_toConn.createSession(true,
> Session.AUTO_ACKNOWLEDGE);
> Destination toDest =
> m_toSess.createQueue(m_queueName);
> m_msgProducer = m_toSess.createProducer(toDest);
> isNotOK = false;
> System.out.println("Successful connection at "
> + new Date());
> }
> catch(Exception e) {
> System.out.println("Failed to connect to " +
> target + " due to: " + e.getMessage());
> try {
> Thread.sleep(60000L);
> } catch (InterruptedException e1) {}
>
> System.out.println("About to retry connection
> at " + new Date());
> }
> }
> }
>
> public void cleanup() throws Exception
> {
> m_cont = false;
>
> if (m_fromConn != null)
> {
> m_fromConn.close();
> m_fromConn = null;
> }
>
> if (m_toConn != null)
> {
> m_toConn.close();
> m_toConn = null;
> }
> }
>
> public static void main(String[] args) throws Exception
> {
> BrokerService broker =
> BrokerFactory.createBroker("xbean:activemq_gateway.xml", true);
> broker.waitUntilStarted();
> TransactionalStoreAndForward tsaf = new
> TransactionalStoreAndForward("vm://AuditGW", "tcp://10.2.154.51:61616",
> "AUDIT.EVENT");
> Thread t = new Thread(tsaf);
> t.start();
> t.join();
> tsaf.cleanup();
> broker.stop();
> broker.waitUntilStopped();
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)