[ 
https://issues.apache.org/jira/browse/ARTEMIS-5787?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dmitriy Nozdrya updated ARTEMIS-5787:
-------------------------------------
    Description: 
If an exception occurs during redistribution while receiving a message from the 
broker, some messages are lost.
Some messages remain in the bridge queue, while others are lost.

If errors occur during redistribution, messages must not be lost. For some use 
cases, this is unacceptable.

Reproduction steps:

1. A symmetric cluster consists of two brokers (broker_1, broker_2), operating 
mode ON_DEMAND.
2. A producer connects to broker_1 and writes messages to the queue.
3. A consumer connects to broker_2 to read messages.
4. Simulate the exception in 
org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil#extractRegularMessage.

 
{code:java}
private static Message extractRegularMessage(ICoreMessage message, 
StorageManager storageManager) {

      RuntimeException exception = new RuntimeException("Exception occurred");
      logger.error(String.format("Error occurred for message=%s", message), 
exception.getMessage(), exception);
      throw exception;

//      ActiveMQBuffer buffer = message.getReadOnlyBodyBuffer();
//
//      if (buffer.readableBytes() < signature.length || 
!checkSignature(buffer)) {
//         logger.trace("Message type {} was used for something other than 
embed messages, ignoring content and treating as a regular message", 
Message.EMBEDDED_TYPE);
//         return message;
//      }
//
//      return readEncoded(message, storageManager, buffer);
   } {code}
The test case can be reproduced using the following test-class - 
[^AMQPMessageRedistributionFailedTest.java]
{code:java}
package org.apache.activemq.artemis.tests.integration.cluster.distribution;

import ...

public class AMQPMessageRedistributionFailedTest extends ClusterTestBase 
{...}{code}
{code:java}
// When running the test, the failed to send log is visible
AMQ224143: Bridge 
$.artemis.internal.sf.cluster0.1b68cb70-d032-11f0-8cc5-2a5a1ea9ca09 failed to 
send CoreMessage[messageID=178, durable=true, userID=null, priority=0, 
timestamp=0, expiration=0, durable=true, address=exampleQueue, size=102833, 
properties=TypedProperties[]]@1819912186: ActiveMQInternalErrorException

// The result of the test execution is SUMMARY_LOG
[main] 13:23:30,142 INFO  
[org.apache.activemq.artemis.tests.integration.cluster.distribution.AMQPMessageRedistributionFailedTest]
 
SUMMARY:
  messageCount=100 sent in exampleQueue queue for node 0
  messageCount=0 for exampleQueue queue for node 0      
  messageCount=0 for exampleQueue queue for node 1      
  messageCount=88 for 
$.artemis.internal.sf.cluster0.b3f833cd-d034-11f0-a96c-2a5a1ea9ca09 
bridge_queue for node 0       
  messageCount=0 for 
$.artemis.internal.sf.cluster1.b2d7336a-d034-11f0-a96c-2a5a1ea9ca09 
bridge_queue for node 1        
  LOST_MESSAGES_COUNT=12
 {code}
 

  was:
If an exception occurs during redistribution while receiving a message from the 
broker, some messages are lost.
Some messages remain in the bridge queue, while others are lost.

If errors occur during redistribution, messages must not be lost. For some use 
cases, this is unacceptable.

Reproduction steps:

1. A symmetric cluster consists of two brokers (broker_1, broker_2), operating 
mode ON_DEMAND.
2. A producer connects to broker_1 and writes messages to the queue.
3. A consumer connects to broker_2 to read messages.
4. Simulate the exception in 
org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil#extractRegularMessage.

 
{code:java}
private static Message extractRegularMessage(ICoreMessage message, 
StorageManager storageManager) {

      RuntimeException exception = new RuntimeException("Exception occurred");
      logger.error(String.format("Error occurred for message=%s", message), 
exception.getMessage(), exception);
      throw exception;

//      ActiveMQBuffer buffer = message.getReadOnlyBodyBuffer();
//
//      if (buffer.readableBytes() < signature.length || 
!checkSignature(buffer)) {
//         logger.trace("Message type {} was used for something other than 
embed messages, ignoring content and treating as a regular message", 
Message.EMBEDDED_TYPE);
//         return message;
//      }
//
//      return readEncoded(message, storageManager, buffer);
   } {code}
 

 

The test case can be reproduced using the following test-class - 
[^AMQPMessageRedistributionFailedTest.java]
{code:java}
package org.apache.activemq.artemis.tests.integration.cluster.distribution;

import ...

public class AMQPMessageRedistributionFailedTest extends ClusterTestBase 
{...}{code}
{code:java}
// When running the test, the failed to send log is visible
AMQ224143: Bridge 
$.artemis.internal.sf.cluster0.1b68cb70-d032-11f0-8cc5-2a5a1ea9ca09 failed to 
send CoreMessage[messageID=178, durable=true, userID=null, priority=0, 
timestamp=0, expiration=0, durable=true, address=exampleQueue, size=102833, 
properties=TypedProperties[]]@1819912186: ActiveMQInternalErrorException

// The result of the test execution is SUMMARY_LOG
[main] 13:23:30,142 INFO  
[org.apache.activemq.artemis.tests.integration.cluster.distribution.AMQPMessageRedistributionFailedTest]
 
SUMMARY:
  messageCount=100 sent in exampleQueue queue for node 0
  messageCount=0 for exampleQueue queue for node 0      
  messageCount=0 for exampleQueue queue for node 1      
  messageCount=88 for 
$.artemis.internal.sf.cluster0.b3f833cd-d034-11f0-a96c-2a5a1ea9ca09 
bridge_queue for node 0       
  messageCount=0 for 
$.artemis.internal.sf.cluster1.b2d7336a-d034-11f0-a96c-2a5a1ea9ca09 
bridge_queue for node 1        
  LOST_MESSAGES_COUNT=12
 {code}
 


> Message loss during failed redistribution
> -----------------------------------------
>
>                 Key: ARTEMIS-5787
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-5787
>             Project: Artemis
>          Issue Type: Bug
>          Components: AMQP, Broker
>    Affects Versions: 2.33.0, 2.44.0
>         Environment:   
>            Reporter: Dmitriy Nozdrya
>            Priority: Critical
>         Attachments: AMQPMessageRedistributionFailedTest.java
>
>
> If an exception occurs during redistribution while receiving a message from 
> the broker, some messages are lost.
> Some messages remain in the bridge queue, while others are lost.
> If errors occur during redistribution, messages must not be lost. For some 
> use cases, this is unacceptable.
> Reproduction steps:
> 1. A symmetric cluster consists of two brokers (broker_1, broker_2), 
> operating mode ON_DEMAND.
> 2. A producer connects to broker_1 and writes messages to the queue.
> 3. A consumer connects to broker_2 to read messages.
> 4. Simulate the exception in 
> org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil#extractRegularMessage.
>  
> {code:java}
> private static Message extractRegularMessage(ICoreMessage message, 
> StorageManager storageManager) {
>       RuntimeException exception = new RuntimeException("Exception occurred");
>       logger.error(String.format("Error occurred for message=%s", message), 
> exception.getMessage(), exception);
>       throw exception;
> //      ActiveMQBuffer buffer = message.getReadOnlyBodyBuffer();
> //
> //      if (buffer.readableBytes() < signature.length || 
> !checkSignature(buffer)) {
> //         logger.trace("Message type {} was used for something other than 
> embed messages, ignoring content and treating as a regular message", 
> Message.EMBEDDED_TYPE);
> //         return message;
> //      }
> //
> //      return readEncoded(message, storageManager, buffer);
>    } {code}
> The test case can be reproduced using the following test-class - 
> [^AMQPMessageRedistributionFailedTest.java]
> {code:java}
> package org.apache.activemq.artemis.tests.integration.cluster.distribution;
> import ...
> public class AMQPMessageRedistributionFailedTest extends ClusterTestBase 
> {...}{code}
> {code:java}
> // When running the test, the failed to send log is visible
> AMQ224143: Bridge 
> $.artemis.internal.sf.cluster0.1b68cb70-d032-11f0-8cc5-2a5a1ea9ca09 failed to 
> send CoreMessage[messageID=178, durable=true, userID=null, priority=0, 
> timestamp=0, expiration=0, durable=true, address=exampleQueue, size=102833, 
> properties=TypedProperties[]]@1819912186: ActiveMQInternalErrorException
> // The result of the test execution is SUMMARY_LOG
> [main] 13:23:30,142 INFO  
> [org.apache.activemq.artemis.tests.integration.cluster.distribution.AMQPMessageRedistributionFailedTest]
>  
> SUMMARY:
>   messageCount=100 sent in exampleQueue queue for node 0
>   messageCount=0 for exampleQueue queue for node 0    
>   messageCount=0 for exampleQueue queue for node 1    
>   messageCount=88 for 
> $.artemis.internal.sf.cluster0.b3f833cd-d034-11f0-a96c-2a5a1ea9ca09 
> bridge_queue for node 0     
>   messageCount=0 for 
> $.artemis.internal.sf.cluster1.b2d7336a-d034-11f0-a96c-2a5a1ea9ca09 
> bridge_queue for node 1      
>   LOST_MESSAGES_COUNT=12
>  {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to