JanYork opened a new issue, #8154:
URL: https://github.com/apache/rocketmq/issues/8154

   ### Before Creating the Bug Report
   
   - [X] I found a bug, not just asking a question, which should be created in 
[GitHub Discussions](https://github.com/apache/rocketmq/discussions).
   
   - [X] I have searched the [GitHub 
Issues](https://github.com/apache/rocketmq/issues) and [GitHub 
Discussions](https://github.com/apache/rocketmq/discussions)  of this 
repository and believe that this is not a duplicate.
   
   - [X] I have confirmed that this bug belongs to the current repository, not 
other repositories of RocketMQ.
   
   
   ### Runtime platform environment
   
   The issue can be reproduced across multiple devices and various environments.
   
   ### RocketMQ version
   
   rocketmq-all-5.2.0-bin-release.
   
   ### JDK Version
   
   JDK17
   
   ### Describe the Bug
   
   In RocketMQ version 5.x, I created a FIFO type topic with both read and 
write queues set to one. Then, I sequentially produced messages using the 
client, with message content being numbers from 0 to 9. After that, I started a 
SimpleConsumer to consume messages. However, the order of numbers I received 
was: 1, 2, 3, 4, 5, 6, 7, 8, 9, 0. This is an issue because I didn't receive 0 
first! This implies that the method of fetching messages by the SimpleConsumer 
is not atomic. Moreover, this phenomenon does not occur every time; it can only 
be occasionally reproduced.
   
   I have tried on various platforms (MacOS and Windows) and experimented with 
different language clients (Java, Node.js, Go), all of which exhibit this 
issue. The fact that the atomic method for message retrieval fails to ensure 
message order consistently across these platforms and languages leads me to 
believe that this is a bug. Furthermore, I am using a single consumer, 
deploying RocketMQ on a single machine, and operating with a single producer. 
Additionally, I am not employing any asynchronous code; everything is executed 
synchronously.
   
   ### Steps to Reproduce
   
   You can also attempt to reproduce this issue in Java using the following 
code:
   
   First, you can create a FIFO type topic and set both the read and write 
queues to one. Then, produce messages to the same group.
   
   ```java
   import org.apache.rocketmq.client.apis.*;
   import org.apache.rocketmq.client.apis.producer.Producer;
   import org.apache.rocketmq.client.apis.producer.ProducerBuilder;
   import org.apache.rocketmq.client.apis.producer.TransactionChecker;
   
   /**
    * Each client will establish an independent connection to the server node 
within a process.
    *
    * <p>In most cases, the singleton mode can meet the requirements of higher 
concurrency.
    * If multiple connections are desired, consider increasing the number of 
clients appropriately.
    */
   public class ProducerSingleton {
       private static volatile Producer PRODUCER;
       private static volatile Producer TRANSACTIONAL_PRODUCER;
       private static final String ACCESS_KEY = "yourAccessKey";
       private static final String SECRET_KEY = "yourSecretKey";
       private static final String ENDPOINTS = "localhost:8081";
   
       private ProducerSingleton() {
       }
   
       private static Producer buildProducer(TransactionChecker checker, 
String... topics) throws ClientException {
           final ClientServiceProvider provider = 
ClientServiceProvider.loadService();
           SessionCredentialsProvider sessionCredentialsProvider =
                   new StaticSessionCredentialsProvider(ACCESS_KEY, SECRET_KEY);
           ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder()
                   .setEndpoints(ENDPOINTS)
                   .enableSsl(false)
                   .setCredentialProvider(sessionCredentialsProvider)
                   .build();
           final ProducerBuilder builder = provider.newProducerBuilder()
                   .setClientConfiguration(clientConfiguration)
                   .setTopics(topics);
           if (checker != null) {
               // Set the transaction checker.
               builder.setTransactionChecker(checker);
           }
           return builder.build();
       }
   
       public static Producer getInstance(String... topics) throws 
ClientException {
           if (null == PRODUCER) {
               synchronized (ProducerSingleton.class) {
                   if (null == PRODUCER) {
                       PRODUCER = buildProducer(null, topics);
                   }
               }
           }
           return PRODUCER;
       }
   
       public static Producer getTransactionalInstance(TransactionChecker 
checker,
                                                       String... topics) throws 
ClientException {
           if (null == TRANSACTIONAL_PRODUCER) {
               synchronized (ProducerSingleton.class) {
                   if (null == TRANSACTIONAL_PRODUCER) {
                       TRANSACTIONAL_PRODUCER = buildProducer(checker, topics);
                   }
               }
           }
           return TRANSACTIONAL_PRODUCER;
       }
   }
   ```
   
   ```java
   import org.apache.rocketmq.client.apis.ClientException;
   import org.apache.rocketmq.client.apis.ClientServiceProvider;
   import org.apache.rocketmq.client.apis.message.Message;
   import org.apache.rocketmq.client.apis.producer.Producer;
   import org.apache.rocketmq.client.apis.producer.SendReceipt;
   import org.slf4j.Logger;
   import org.slf4j.LoggerFactory;
   
   import java.io.IOException;
   import java.nio.charset.StandardCharsets;
   
   /**
    * @author muyouzhi
    */
   public class MessageProducer {
       private static final Logger log = 
LoggerFactory.getLogger(MessageProducer.class);
   
       private MessageProducer() {
       }
   
       public static void main(String[] args) throws ClientException, 
IOException {
           final ClientServiceProvider provider = 
ClientServiceProvider.loadService();
   
           String topic = "checkout-topic-fifo";
           String group = "checkout-group";
           String tag = "checkout";
   
           final Producer producer = ProducerSingleton.getInstance(topic);
   
           for (int i = 0; i < 10; i++) {
               byte[] body = String.valueOf(i).getBytes(StandardCharsets.UTF_8);
   
               final Message message = provider.newMessageBuilder()
                       .setTopic(topic)
                       .setTag(tag)
                       .setMessageGroup(group)
                       .setKeys("message"+i)
                       .setBody(body)
                       .build();
   
   
               try {
                   final SendReceipt sendReceipt = producer.send(message);
                   log.info("Send message successfully, messageId={}", 
sendReceipt.getMessageId());
               } catch (Throwable t) {
                   log.error("Failed to send message", t);
               }
           }
   
            producer.close();
       }
   }
   ```
   
   ```java
   import org.apache.rocketmq.client.apis.*;
   import org.apache.rocketmq.client.apis.consumer.FilterExpression;
   import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
   import org.apache.rocketmq.client.apis.message.MessageId;
   import org.apache.rocketmq.client.apis.message.MessageView;
   import org.slf4j.Logger;
   import org.slf4j.LoggerFactory;
   
   import java.nio.ByteBuffer;
   import java.nio.charset.StandardCharsets;
   import java.time.Duration;
   import java.util.Collections;
   import java.util.List;
   
   /**
    * @author muyouzhi
    */
   public class SimpleConsumer {
       private static final Logger log = 
LoggerFactory.getLogger(SimpleConsumer.class);
   
       private SimpleConsumer() {
       }
   
       @SuppressWarnings({"resource", "InfiniteLoopStatement"})
       public static void main(String[] args) throws ClientException {
           final ClientServiceProvider provider = 
ClientServiceProvider.loadService();
   
           String accessKey = "yourAccessKey";
           String secretKey = "yourSecretKey";
           SessionCredentialsProvider sessionCredentialsProvider =
                   new StaticSessionCredentialsProvider(accessKey, secretKey);
   
           String endpoints = "localhost:8081";
           ClientConfiguration clientConfiguration = 
ClientConfiguration.newBuilder()
                   .setEndpoints(endpoints)
                   // .enableSsl(false)
                   .setCredentialProvider(sessionCredentialsProvider)
                   .build();
   
           String topic = "checkout-topic-fifo";
           String group = "checkout-group";
           String tag = "checkout";
           Duration awaitDuration = Duration.ofSeconds(30);
   
           FilterExpression filterExpression = new FilterExpression(tag, 
FilterExpressionType.TAG);
   
           org.apache.rocketmq.client.apis.consumer.SimpleConsumer consumer = 
provider.newSimpleConsumerBuilder()
                   .setClientConfiguration(clientConfiguration)
                   .setConsumerGroup(group)
                   .setAwaitDuration(awaitDuration)
                   .setSubscriptionExpressions(Collections.singletonMap(topic, 
filterExpression))
                   .build();
   
           int maxMessageNum = 1;
           Duration invisibleDuration = Duration.ofSeconds(15);
   
           do {
               final List<MessageView> messages = 
consumer.receive(maxMessageNum, invisibleDuration);
               log.info("Received {} message(s)", messages.size());
               for (MessageView message : messages) {
                   final MessageId messageId = message.getMessageId();
   
                   ByteBuffer buffer = message.getBody();
   
                   byte[] messageBytes = new byte[buffer.remaining()];
                   buffer.get(messageBytes);
   
                   String messageStr = new String(messageBytes, 
StandardCharsets.UTF_8);
   
                   try {
                       consumer.ack(message);
                       log.info("Message is acknowledged, messageId={}, 
message={}", messageId, messageStr);
                   } catch (Throwable t) {
                       log.error("Message is failed to be acknowledged, 
messageId={}", messageId, t);
                   }
               }
           } while (true);
   //         consumer.close();
       }
   }
   ```
   
   Then, run the MessageProducer class to sequentially send messages. After 
sending is complete, promptly run the consumer. If the issue does not occur, 
close the consumer and repeat the previous steps. This can be reproduced; I 
have asked some friends in technical exchange groups to do the same, and they 
have successfully reproduced it. They also believe it to be a bug.
   
   For Node.js, you can refer to the issue I raised previously: 
https://github.com/apache/rocketmq/issues/8133
   
   ### What Did You Expect to See?
   
   Messages in the message queue are consumed sequentially.
   
   ### What Did You See Instead?
   
   the log:
   ```sh
   14:59:05.735 [main] INFO SimpleConsumer - Received 1 message(s)
   14:59:05.756 [main] INFO SimpleConsumer - Message is acknowledged, 
messageId=012EFD84B22E14BBE7065748B200000001, message=1
   14:59:05.763 [main] INFO SimpleConsumer - Received 1 message(s)
   14:59:05.768 [main] INFO SimpleConsumer - Message is acknowledged, 
messageId=012EFD84B22E14BBE7065748B200000002, message=2
   14:59:05.776 [main] INFO SimpleConsumer - Received 1 message(s)
   14:59:05.782 [main] INFO SimpleConsumer - Message is acknowledged, 
messageId=012EFD84B22E14BBE7065748B200000003, message=3
   14:59:05.792 [main] INFO SimpleConsumer - Received 1 message(s)
   14:59:05.799 [main] INFO SimpleConsumer - Message is acknowledged, 
messageId=012EFD84B22E14BBE7065748B200000004, message=4
   14:59:05.811 [main] INFO SimpleConsumer - Received 1 message(s)
   14:59:05.821 [main] INFO SimpleConsumer - Message is acknowledged, 
messageId=012EFD84B22E14BBE7065748B200000005, message=5
   14:59:05.837 [main] INFO SimpleConsumer - Received 1 message(s)
   14:59:05.847 [main] INFO SimpleConsumer - Message is acknowledged, 
messageId=012EFD84B22E14BBE7065748B200000006, message=6
   14:59:05.855 [main] INFO SimpleConsumer - Received 1 message(s)
   14:59:05.866 [main] INFO SimpleConsumer - Message is acknowledged, 
messageId=012EFD84B22E14BBE7065748B200000007, message=7
   14:59:05.875 [main] INFO SimpleConsumer - Received 1 message(s)
   14:59:05.884 [main] INFO SimpleConsumer - Message is acknowledged, 
messageId=012EFD84B22E14BBE7065748B200000008, message=8
   14:59:05.895 [main] INFO SimpleConsumer - Received 1 message(s)
   14:59:05.902 [main] INFO SimpleConsumer - Message is acknowledged, 
messageId=012EFD84B22E14BBE7065748B200000009, message=9
   ```
   
   after waiting for some time:
   ```sh
   14:59:25.868 [main] INFO SimpleConsumer - Received 1 message(s)
   14:59:25.872 [main] INFO SimpleConsumer - Message is acknowledged, 
messageId=012EFD84B22E14BBE7065748B200000000, message=0
   ```
   
   ### Additional Context
   
   <img width="1033" alt="image" 
src="https://github.com/apache/rocketmq/assets/88621545/1e3d6a3d-6e5d-4cb2-bb7f-8a0619b9bffb";>
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to