[ 
https://issues.apache.org/jira/browse/AMQ-6142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15121768#comment-15121768
 ] 

Claudio Tagliola commented on AMQ-6142:
---------------------------------------

{noformat]
package com.foo.bar.activemqtest;

import javax.jms.*;
import javax.jms.Connection;
import javax.jms.Message;

import org.apache.activemq.*;
import org.apache.activemq.broker.*;
import org.junit.*;

public class ActiveMQBytesMessageCorruptionTest
{
    private volatile AssertionError assertionError;

    @Test
    public void bytesMessageCorruption() throws Exception
    {
        BrokerService brokerService = new BrokerService();
        brokerService.setBrokerName("embedded");
        brokerService.setPersistent(false);
        brokerService.start();

        ActiveMQConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory("vm://embedded");
        connectionFactory.setUseCompression(true);

        Connection connection = connectionFactory.createConnection();
        connection.start();

        for (int i = 0; i < 10; i++)
        {
            Session mySession = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
            
mySession.createConsumer(mySession.createTopic("foo.bar")).setMessageListener(this::onMessage);
        }

        Session producerSession = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
        MessageProducer messageProducer = 
producerSession.createProducer(producerSession.createTopic("foo.bar"));

        for (int i = 0; i < 1000; i++)
        {
            BytesMessage bytesMessage = producerSession.createBytesMessage();
            bytesMessage.writeBytes(new byte[0]);
            messageProducer.send(bytesMessage);

            if (assertionError != null)
            {
                throw assertionError;
            }
        }

    }

    private void onMessage(Message message)
    {
        try
        {
            ((BytesMessage) message).getBodyLength();
        }
        catch (JMSException | Error e)
        {
            assertionError = new AssertionError("Exception in thread", e);
        }
    }
}
{noformat}
Unit test will fail most of the time with:
{noformat}
java.lang.AssertionError: Exception in thread
        at 
com.foo.bar.activemqtest.ActiveMQBytesMessageCorruptionTest.onMessage(ActiveMQBytesMessageCorruptionTest.java:60)
        at 
org.apache.activemq.ActiveMQMessageConsumer.dispatch(ActiveMQMessageConsumer.java:1393)
        at 
org.apache.activemq.ActiveMQSessionExecutor.dispatch(ActiveMQSessionExecutor.java:131)
        at 
org.apache.activemq.ActiveMQSessionExecutor.iterate(ActiveMQSessionExecutor.java:202)
        at 
org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:133)
        at 
org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:48)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.OutOfMemoryError: Java heap space
        at 
org.apache.activemq.command.ActiveMQBytesMessage.decompress(ActiveMQBytesMessage.java:897)
        at 
org.apache.activemq.command.ActiveMQBytesMessage.initializeReading(ActiveMQBytesMessage.java:876)
        at 
org.apache.activemq.command.ActiveMQBytesMessage.getBodyLength(ActiveMQBytesMessage.java:198)
        at 
com.foo.bar.activemqtest.ActiveMQBytesMessageCorruptionTest.onMessage(ActiveMQBytesMessageCorruptionTest.java:56)
        at 
com.foo.bar.activemqtest.ActiveMQBytesMessageCorruptionTest$$Lambda$3/665188480.onMessage(Unknown
 Source)
        ... 8 more
{noformat}


> ActiveMQBytesMessage decompress throws DataFormatException incorrect header 
> check
> ---------------------------------------------------------------------------------
>
>                 Key: AMQ-6142
>                 URL: https://issues.apache.org/jira/browse/AMQ-6142
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 5.10.2, 5.12.1, 5.11.3, 5.13.0
>            Reporter: Claudio Tagliola
>         Attachments: Client.java, MessageListener.java, Server.java, pom.xml
>
>
> In our environment we use an embedded broker. On one topic where compression 
> is enabled, the server is also listening in on the messages. From ActiveMQ 
> 5.10.0 up to 5.13.0, we encounter DataFormatException: incorrect header check 
> exceptions on the tcp clients due to corruption of the payload. Attached are 
> a test server and client. At some point, the client will exit due to 
> mentioned exception. Increase chances by running multiple clients. This 
> scenario works with 5.8.0 and 5.9.1.
> If the server has multiple consumers on the same topic, they will encounter 
> corruption as well, but this has other side-effects.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to