kahadb log files not deleted when exceptions are thrown by consumers
--------------------------------------------------------------------

                 Key: AMQ-2937
                 URL: https://issues.apache.org/activemq/browse/AMQ-2937
             Project: ActiveMQ
          Issue Type: Bug
          Components: Message Store
    Affects Versions: 5.4.1
         Environment: OSX 10.5.8, Spring 2.5.6
            Reporter: James Mason


kahadb log files are not being cleaned up, and are building up over a number of 
days. I've created a simple test that can reproduce the problem, and it only 
reproduces the problem if my message consumer throws errors (in the test case 
they are thrown 1 in 100 times). 

The messages are re-consumed, and none of them are added to a DLQ. I can 
provide the test code if this helps. I'm using AMQ v5.4.0, persistent queues, 
and using AMQ embedded with Spring. 

Test code:
(The test needs to be kept running until several log files have been created)


activemq.xml
<beans xmlns="http://www.springframework.org/schema/beans";
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
        xsi:schemaLocation="http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
  http://activemq.apache.org/schema/core 
http://activemq.apache.org/schema/core/activemq-core.xsd   
  http://camel.apache.org/schema/spring 
http://camel.apache.org/schema/spring/camel-spring.xsd";>

        <broker xmlns="http://activemq.apache.org/schema/core";
                brokerName="amq-broker" 
                useJmx="true" 
                destroyApplicationContextOnStop="true"
                useShutdownHook="true">
                
                <persistenceAdapter>
                        <kahaDB
                                directory="/data"
                                indexWriteBatchSize="1000" 
                                enableIndexWriteAsync="true"
                                enableJournalDiskSyncs="false" />
                </persistenceAdapter>

                <transportConnectors>
                        <transportConnector 
uri="tcp://localhost:61615?jms.prefetchPolicy.all=1" />
                </transportConnectors>

                <!-- JMX active -->
                <managementContext>
                        <managementContext createConnector="true" />
                </managementContext>

                <plugins>
                        <statisticsBrokerPlugin />
                </plugins>
        </broker>
</beans>




'Unit' Test (long running test...)

public class AmqKahaDbTest extends IntegrationTestBase {

        public void testKahaDbLogs() throws Exception {
                for(int i = 0; i < 1000000; i++) {
                        sendMessage(i);
                        
                        Thread.sleep(20);
                }
        }
        
        private void sendMessage(final int i) {
                jmsTemplate.send(testDestination, new MessageCreator() {
                        public Message createMessage(Session session) throws 
JMSException {
                                MapMessage message = session.createMapMessage();
                                
                                message.setString("testField1", "Test Message " 
+ i);
                                
                                return message;
                        }
                });
        }
        
        private JmsTemplate jmsTemplate;
        public void setJmsTemplate(JmsTemplate jmsTemplate) {
                this.jmsTemplate = jmsTemplate;
        }
        
        private Destination testDestination;
        public void setTestDestination(Destination testDestination) {
                this.testDestination = testDestination;
        }
}





Message listener:

public class TestMessageListener implements MessageListener {
        
        public void onMessage(Message message) {
        if (message instanceof MapMessage) {
                
                        //read the type of subtask
                        MapMessage mapMessage = (MapMessage)message;
                        String messageText;
                                try {
                                        messageText = 
mapMessage.getString("testField1");
                                        long sleepTime = (long)(Math.random() * 
100d);
                                        
                                        log.info("Test message consume start, 
sleep time: " + sleepTime);
                                        
                                        Thread.sleep(sleepTime);
                                        
                                        if(sleepTime == 50) {
                                                //if commented out then the old 
log files are deleted
                                                throw new 
RuntimeException("Random error!!!");
                                        }
                                        
                                        log.info("Test message consume start: " 
+ messageText);
                                        
                                } catch (Exception e) {
                                        log.error("Error consuming message", e);
                                        throw new RuntimeException(e);
                                }
                        
        } else {
            throw new IllegalArgumentException("Message must be of type Test");
        }
    }
        
        private Logger log = Logger.getLogger(TestMessageListener.class);
}




Spring config:

        <bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">
            <property name="config" value="classpath:activemq.xml" />
            <property name="start" value="true" />
        </bean>

        <amq:connectionFactory id="amqConnectionFactory">
                <property name="brokerURL" 
value="vm://amq-broker:61615?jms.prefetchPolicy.all=1" />
        </amq:connectionFactory>
        
        <!-- CachingConnectionFactory Definition, sessionCacheSize property is 
the number of sessions to cache -->
        <bean id="connectionFactory" 
class="org.springframework.jms.connection.CachingConnectionFactory">
            <constructor-arg ref="amqConnectionFactory" />
            <property name="exceptionListener" ref="jmsExceptionListener" />
            <property name="sessionCacheSize" value="100" />
        </bean>

        <bean id="testDestination" 
class="org.apache.activemq.command.ActiveMQQueue">
                <constructor-arg index="0" value="test.queue" />
        </bean>
        
        <bean id="testListenerContainer" 
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory"/>
            <property name="destination" ref="testDestination"/>
            <property name="messageListener" ref="testMessageListener"/>
            <property name="sessionTransacted" value="true"/>
            <property name="maxConcurrentConsumers" value="10" />
            <property name="exceptionListener" ref="jmsExceptionListener" />
        </bean>
        
        <bean id="testMessageListener" class="TestMessageListener" />




-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to