[ 
https://issues.apache.org/jira/browse/AMQ-6067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15413093#comment-15413093
 ] 

Czeslaw commented on AMQ-6067:
------------------------------

Hi,
I see another issue in amq 5.13.2 related with topic message expiration.   I 
have one producer which send me messages to MyTopic and one durable 
subscriber/consumer. Messages are stored in DB and after 30 seconds amq broker 
trying to expire these messages ( I have configuration entry for this: 
<amq:policyEntry topic="MyTopic.>" expireMessagesPeriod="30000"> ). I’m getting 
NullPointerException from AdvisoryBroker and messages are never expired and 
remain in activemq_msgs table.

java.lang.NullPointerException
        at 
org.apache.activemq.advisory.AdvisoryBroker.messageExpired(AdvisoryBroker.java:430)
        at 
org.apache.activemq.broker.BrokerFilter.messageExpired(BrokerFilter.java:313)
        at 
org.apache.activemq.broker.BrokerFilter.messageExpired(BrokerFilter.java:313)
        at 
org.apache.activemq.broker.BrokerFilter.messageExpired(BrokerFilter.java:313)
        at 
org.apache.activemq.broker.MutableBrokerFilter.messageExpired(MutableBrokerFilter.java:325)
        at 
org.apache.activemq.broker.region.Topic.messageExpired(Topic.java:776)
        at org.apache.activemq.broker.region.Topic.doBrowse(Topic.java:660)


Under debugger I see that in 
org.apache.activemq.advisory.AdvisoryBroker.messageExpired  baseDestination is 
null and call baseDestination.getActiveMQDestination() generates NPE

@Override
public void messageExpired(ConnectionContext context, MessageReference 
messageReference, Subscription subscription) {
    super.messageExpired(context, messageReference, subscription);
    try {
        if (!messageReference.isAdvisory()) {
            BaseDestination baseDestination = (BaseDestination) 
messageReference.getMessage().getRegionDestination();
            ActiveMQTopic topic = 
AdvisorySupport.getExpiredMessageTopic(baseDestination.getActiveMQDestination());

 Below is code how messages are produced:

public static class Producer implements Runnable {
    public void run() {
        try {
            ActiveMQConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory(BROKER_ULR);
            Connection connection = connectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
            Topic destination = session.createTopic("MyTopic");

            MessageProducer producer = session.createProducer(destination);
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
            producer.setTimeToLive(30000);
          
            for(int i =0; i < 10; i ++) {
                String text = "Message number " + i +  "  From: " + 
Thread.currentThread().getName();
                TextMessage message = session.createTextMessage(text);
                message.setJMSDestination(destination);
                producer.send(destination, message);
                Thread.sleep(10000);
            }
            // Clean up
            session.close();
            connection.close();
        }
        catch (Exception e) {
            System.out.println("Caught: " + e);
            e.printStackTrace();
        }
    }
}

Under debugger I see that setter method setRegionDestination 
(org.apache.activemq.command.Message ) is called in 
org.apache.activemq.broker.region.Topic class before message is send:
  
@Override
public void send(final ProducerBrokerExchange producerExchange, final Message 
message) throws Exception {
    final ConnectionContext context = producerExchange.getConnectionContext();

    final ProducerInfo producerInfo = 
producerExchange.getProducerState().getInfo();
    producerExchange.incrementSend();
    final boolean sendProducerAck = !message.isResponseRequired() && 
producerInfo.getWindowSize() > 0
            && !context.isInRecoveryMode();

    message.setRegionDestination(this);

I don’t see call this method when message is loaded. 

Do you have any idea what can be wrong ? Is it issue in my messages (something 
is missing)  or it is another bug in amq code ?
 


> OutOfMemoryError when expiring big amount of topic messages
> -----------------------------------------------------------
>
>                 Key: AMQ-6067
>                 URL: https://issues.apache.org/jira/browse/AMQ-6067
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: JDBC
>    Affects Versions: 5.10.0
>            Reporter: Petr Havránek
>              Labels: durable, durable_subscription, expiration, jdbc, 
> timeToLive,
>
> There is a problem in
> {noformat}
> org.apache.activemq.broker.region.Topic.expireMessagesTask
> {noformat}
> When there are big amount of topic messages that are going to be expired, 
> this {{expireMessagesTask}} loads all of the messages to memory. This causes
> {noformat}
> 2015-11-24 11:05:46.359 WARN  [ActiveMQ Broker[JmsEngineActivemqBroker] 
> Scheduler] [Topic] Failed to browse Topic: test-topic
> java.lang.OutOfMemoryError: Java heap space
>       at oracle.sql.BLOB.getBytes(BLOB.java:204)
>       at oracle.jdbc.driver.T4CBlobAccessor.getBytes(T4CBlobAccessor.java:464)
>       at 
> oracle.jdbc.driver.OracleResultSetImpl.getBytes(OracleResultSetImpl.java:676)
>       at 
> org.apache.commons.dbcp.DelegatingResultSet.getBytes(DelegatingResultSet.java:203)
>       at 
> org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter.getBinaryData(DefaultJDBCAdapter.java:80)
>       at 
> org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter.doRecover(DefaultJDBCAdapter.java:418)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>       at java.lang.reflect.Method.invoke(Method.java:597)
>       at 
> org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:309)
>       at 
> org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
>       at 
> org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
>       at 
> org.springframework.aop.interceptor.AbstractTraceInterceptor.invoke(AbstractTraceInterceptor.java:113)
>       at 
> org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
>       at 
> org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:202)
>       at $Proxy14.doRecover(Unknown Source)
>       at 
> org.apache.activemq.store.jdbc.JDBCMessageStore.recover(JDBCMessageStore.java:236)
>       at 
> org.apache.activemq.store.ProxyTopicMessageStore.recover(ProxyTopicMessageStore.java:62)
>       at org.apache.activemq.broker.region.Topic.doBrowse(Topic.java:594)
>       at org.apache.activemq.broker.region.Topic.access$100(Topic.java:65)
>       at org.apache.activemq.broker.region.Topic$6.run(Topic.java:733)
>       at 
> org.apache.activemq.thread.SchedulerTimerTask.run(SchedulerTimerTask.java:33)
>       at java.util.TimerThread.mainLoop(Timer.java:512)
>       at java.util.TimerThread.run(Timer.java:462)
> {noformat}
> The problem happens when using JDBC persistency with ActiveMQ 5.10.0. After a 
> short look to source code, the same problem could be also with 5.12.1.
> Test case:
> - run ActiveMQ broker with JDBC persistency
> - create subscription to a topic, but do not receive the messages
> - send enough number of messages with short TimeToLive
> - when expireMessagesTask is scheduled, it tries to load all of the messages 
> and causes the OutOfMemoryError
> It would be fine if
> {noformat}
> org.apache.activemq.store.jdbc.JDBCMessageStore.recover(MessageRecoveryListener)
> {noformat}
> will be updated like this:
> {code:java}
> public void recover(final MessageRecoveryListener listener) throws Exception {
>   // Get all the Message ids out of the database.
>   TransactionContext c = persistenceAdapter.getTransactionContext();
>   try {
>     c = persistenceAdapter.getTransactionContext();
>     adapter.doRecover(c, destination, new JDBCMessageRecoveryListener() {
>       public boolean recoverMessage(long sequenceId, byte[] data) throws 
> Exception {
>         if (listener.hasSpace()) {
>           Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
>           msg.getMessageId().setBrokerSequenceId(sequenceId);
>           return listener.recoverMessage(msg);
>         } else {
>           logger.debug("Recovery limit of the messages has exceeded.");
>           return false;
>         }                    
>       }
>       public boolean recoverMessageReference(String reference) throws 
> Exception {
>         if (listener.hasSpace()) {
>           return listener.recoverMessageReference(new MessageId(reference));
>         } else {
>           logger.debug("Recovery limit of the message references has 
> exceeded.");
>           return false;
>         }
>       }
>     });
>   } catch (SQLException e) {
>     JDBCPersistenceAdapter.log("JDBC Failure: ", e);
>     throw IOExceptionSupport.create("Failed to recover container. Reason: " + 
> e, e);
>   } finally {
>     c.close();
>   }
> }
> {code}
> But I am not sure if this limitation is the best way, because there will be 
> some messages that should be expired, but need to wait. So better solution 
> might be to do this job in more separated transactions.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to