[ 
https://issues.apache.org/activemq/browse/AMQ-2937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=62038#action_62038
 ] 

James Mason commented on AMQ-2937:
----------------------------------

I also tested against 5.4.1 and get the same problem

> kahadb log files not deleted when exceptions are thrown by consumers
> --------------------------------------------------------------------
>
>                 Key: AMQ-2937
>                 URL: https://issues.apache.org/activemq/browse/AMQ-2937
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Message Store
>    Affects Versions: 5.4.1
>         Environment: OSX 10.5.8, Spring 2.5.6
>            Reporter: James Mason
>
> kahadb log files are not being cleaned up, and are building up over a number 
> of days. I've created a simple test that can reproduce the problem, and it 
> only reproduces the problem if my message consumer throws errors (in the test 
> case they are thrown 1 in 100 times). 
> The messages are re-consumed, and none of them are added to a DLQ. I can 
> provide the test code if this helps. I'm using AMQ v5.4.0, persistent queues, 
> and using AMQ embedded with Spring. 
> Test code:
> (The test needs to be kept running until several log files have been created)
> activemq.xml
> <beans xmlns="http://www.springframework.org/schema/beans";
>       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
>       xsi:schemaLocation="http://www.springframework.org/schema/beans 
> http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
>   http://activemq.apache.org/schema/core 
> http://activemq.apache.org/schema/core/activemq-core.xsd   
>   http://camel.apache.org/schema/spring 
> http://camel.apache.org/schema/spring/camel-spring.xsd";>
>       <broker xmlns="http://activemq.apache.org/schema/core";
>               brokerName="amq-broker" 
>               useJmx="true" 
>               destroyApplicationContextOnStop="true"
>               useShutdownHook="true">
>               
>               <persistenceAdapter>
>                       <kahaDB
>                               directory="/data"
>                               indexWriteBatchSize="1000" 
>                               enableIndexWriteAsync="true"
>                               enableJournalDiskSyncs="false" />
>               </persistenceAdapter>
>               <transportConnectors>
>                       <transportConnector 
> uri="tcp://localhost:61615?jms.prefetchPolicy.all=1" />
>               </transportConnectors>
>               <!-- JMX active -->
>               <managementContext>
>                       <managementContext createConnector="true" />
>               </managementContext>
>               <plugins>
>                       <statisticsBrokerPlugin />
>               </plugins>
>       </broker>
> </beans>
> 'Unit' Test (long running test...)
> public class AmqKahaDbTest extends IntegrationTestBase {
>       public void testKahaDbLogs() throws Exception {
>               for(int i = 0; i < 1000000; i++) {
>                       sendMessage(i);
>                       
>                       Thread.sleep(20);
>               }
>       }
>       
>       private void sendMessage(final int i) {
>               jmsTemplate.send(testDestination, new MessageCreator() {
>                       public Message createMessage(Session session) throws 
> JMSException {
>                               MapMessage message = session.createMapMessage();
>                               
>                               message.setString("testField1", "Test Message " 
> + i);
>                               
>                               return message;
>                       }
>               });
>       }
>       
>       private JmsTemplate jmsTemplate;
>       public void setJmsTemplate(JmsTemplate jmsTemplate) {
>               this.jmsTemplate = jmsTemplate;
>       }
>       
>       private Destination testDestination;
>       public void setTestDestination(Destination testDestination) {
>               this.testDestination = testDestination;
>       }
> }
> Message listener:
> public class TestMessageListener implements MessageListener {
>       
>       public void onMessage(Message message) {
>         if (message instanceof MapMessage) {
>               
>                       //read the type of subtask
>                       MapMessage mapMessage = (MapMessage)message;
>                       String messageText;
>                               try {
>                                       messageText = 
> mapMessage.getString("testField1");
>                                       long sleepTime = (long)(Math.random() * 
> 100d);
>                                       
>                                       log.info("Test message consume start, 
> sleep time: " + sleepTime);
>                                       
>                                       Thread.sleep(sleepTime);
>                                       
>                                       if(sleepTime == 50) {
>                                               //if commented out then the old 
> log files are deleted
>                                               throw new 
> RuntimeException("Random error!!!");
>                                       }
>                                       
>                                       log.info("Test message consume start: " 
> + messageText);
>                                       
>                               } catch (Exception e) {
>                                       log.error("Error consuming message", e);
>                                       throw new RuntimeException(e);
>                               }
>                       
>       } else {
>             throw new IllegalArgumentException("Message must be of type 
> Test");
>         }
>     }
>       
>       private Logger log = Logger.getLogger(TestMessageListener.class);
> }
> Spring config:
>       <bean id="broker" class="org.apache.activemq.xbean.BrokerFactoryBean">
>           <property name="config" value="classpath:activemq.xml" />
>           <property name="start" value="true" />
>       </bean>
>       <amq:connectionFactory id="amqConnectionFactory">
>               <property name="brokerURL" 
> value="vm://amq-broker:61615?jms.prefetchPolicy.all=1" />
>       </amq:connectionFactory>
>       
>       <!-- CachingConnectionFactory Definition, sessionCacheSize property is 
> the number of sessions to cache -->
>       <bean id="connectionFactory" 
> class="org.springframework.jms.connection.CachingConnectionFactory">
>           <constructor-arg ref="amqConnectionFactory" />
>           <property name="exceptionListener" ref="jmsExceptionListener" />
>           <property name="sessionCacheSize" value="100" />
>       </bean>
>       <bean id="testDestination" 
> class="org.apache.activemq.command.ActiveMQQueue">
>               <constructor-arg index="0" value="test.queue" />
>       </bean>
>       
>       <bean id="testListenerContainer" 
> class="org.springframework.jms.listener.DefaultMessageListenerContainer">
>           <property name="connectionFactory" ref="connectionFactory"/>
>           <property name="destination" ref="testDestination"/>
>           <property name="messageListener" ref="testMessageListener"/>
>           <property name="sessionTransacted" value="true"/>
>           <property name="maxConcurrentConsumers" value="10" />
>           <property name="exceptionListener" ref="jmsExceptionListener" />
>       </bean>
>       
>       <bean id="testMessageListener" class="TestMessageListener" />

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to