This is an automated email from the ASF dual-hosted git repository.

kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 0e3f8b4d7 Fix the issue that TransactionMessageIT can not pass (#5394)
0e3f8b4d7 is described below

commit 0e3f8b4d7265b17b3592ec0432859d482bc6a9d1
Author: rongtong <[email protected]>
AuthorDate: Mon Oct 24 18:45:05 2022 +0800

    Fix the issue that TransactionMessageIT can not pass (#5394)
---
 .../test/container/TransactionMessageIT.java       | 35 ++++++++--------------
 1 file changed, 13 insertions(+), 22 deletions(-)

diff --git 
a/test/src/test/java/org/apache/rocketmq/test/container/TransactionMessageIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/container/TransactionMessageIT.java
index 06566e46f..e2e020d8c 100644
--- 
a/test/src/test/java/org/apache/rocketmq/test/container/TransactionMessageIT.java
+++ 
b/test/src/test/java/org/apache/rocketmq/test/container/TransactionMessageIT.java
@@ -46,11 +46,11 @@ import static org.awaitility.Awaitility.await;
 public class TransactionMessageIT extends ContainerIntegrationTestBase {
 
     private static final String MESSAGE_STRING = 
RandomStringUtils.random(1024);
-    private static byte[] MESSAGE_BODY;
+    private static byte[] messageBody;
 
     static {
         try {
-            MESSAGE_BODY = 
MESSAGE_STRING.getBytes(RemotingHelper.DEFAULT_CHARSET);
+            messageBody = 
MESSAGE_STRING.getBytes(RemotingHelper.DEFAULT_CHARSET);
         } catch (UnsupportedEncodingException ignored) {
         }
     }
@@ -83,7 +83,7 @@ public class TransactionMessageIT extends 
ContainerIntegrationTestBase {
         producer.start();
 
         for (int i = 0; i < MESSAGE_COUNT; i++) {
-            Message msg = new Message(topic, MESSAGE_BODY);
+            Message msg = new Message(topic, messageBody);
             TransactionSendResult result = 
producer.sendMessageInTransaction(msg, null);
             
assertThat(result.getLocalTransactionState()).isEqualTo(LocalTransactionState.COMMIT_MESSAGE);
         }
@@ -106,7 +106,6 @@ public class TransactionMessageIT extends 
ContainerIntegrationTestBase {
     public void consumeTransactionMsgLocalEscape() throws Exception {
         final String topic = generateTopic();
         createTopicTo(master1With3Replicas, topic, 1, 1);
-        System.out.println("topic " + topic + " created");
 
         final String group = generateGroup();
         DefaultMQPushConsumer pushConsumer = createPushConsumer(group);
@@ -115,7 +114,6 @@ public class TransactionMessageIT extends 
ContainerIntegrationTestBase {
         Map<String, Message> msgSentMap = new HashMap<>();
         pushConsumer.registerMessageListener((MessageListenerConcurrently) 
(msgs, context) -> {
             for (MessageExt msg : msgs) {
-                System.out.println("receive trans msgId=" + msg.getMsgId() + 
", transactionId=" + msg.getTransactionId());
                 if (msgSentMap.containsKey(msg.getMsgId())) {
                     receivedMsgCount.incrementAndGet();
                 }
@@ -130,11 +128,10 @@ public class TransactionMessageIT extends 
ContainerIntegrationTestBase {
         producer.start();
 
         for (int i = 0; i < MESSAGE_COUNT; i++) {
-            Message msg = new Message(topic, MESSAGE_BODY);
+            Message msg = new Message(topic, messageBody);
             msg.setKeys(UUID.randomUUID().toString());
             SendResult result = producer.sendMessageInTransaction(msg, null);
             String msgId = result.getMsgId();
-            System.out.println("Sent trans msgid=" + msgId + ", 
transactionId=" + result.getTransactionId() + ", key=" + msg.getKeys());
 
             msgSentMap.put(msgId, msg);
         }
@@ -143,8 +140,8 @@ public class TransactionMessageIT extends 
ContainerIntegrationTestBase {
         brokerContainer1.removeBroker(new 
BrokerIdentity(master1With3Replicas.getBrokerIdentity().getBrokerClusterName(),
             master1With3Replicas.getBrokerIdentity().getBrokerName(),
             master1With3Replicas.getBrokerIdentity().getBrokerId()));
-        System.out.println("=========" + 
master1With3Replicas.getBrokerIdentity().getBrokerName() + "-"
-            + master1With3Replicas.getBrokerIdentity().getBrokerId() + " 
removed");
+        System.out.printf("=========" + 
master1With3Replicas.getBrokerIdentity().getBrokerName() + "-"
+            + master1With3Replicas.getBrokerIdentity().getBrokerId() + " 
removed%n");
         createTopicTo(master2With3Replicas, topic, 1, 1);
 
         transactionCheckListener.setShouldReturnUnknownState(false);
@@ -169,7 +166,6 @@ public class TransactionMessageIT extends 
ContainerIntegrationTestBase {
         pushConsumer2.subscribe(topic, "*");
         pushConsumer2.registerMessageListener((MessageListenerConcurrently) 
(msgs, context) -> {
             for (MessageExt msg : msgs) {
-                System.out.println("[After master recovered] receive trans 
msgId=" + msg.getMsgId() + ", transactionId=" + msg.getTransactionId());
                 if (msgSentMap.containsKey(msg.getMsgId())) {
                     receivedMsgCount.incrementAndGet();
                 }
@@ -178,17 +174,15 @@ public class TransactionMessageIT extends 
ContainerIntegrationTestBase {
             return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
         });
         pushConsumer2.start();
-        System.out.println("Wait for checking...");
+        System.out.printf("Wait for checking...%n");
         Thread.sleep(10000L);
 
-
     }
 
     @Test
     public void consumeTransactionMsgRemoteEscape() throws Exception {
         final String topic = generateTopic();
         createTopicTo(master1With3Replicas, topic, 1, 1);
-        System.out.println("topic " + topic + " created");
 
         final String group = generateGroup();
 
@@ -198,7 +192,6 @@ public class TransactionMessageIT extends 
ContainerIntegrationTestBase {
         pushConsumer.subscribe(topic, "*");
         pushConsumer.registerMessageListener((MessageListenerConcurrently) 
(msgs, context) -> {
             for (MessageExt msg : msgs) {
-                System.out.println("receive trans msgId=" + msg.getMsgId() + 
", transactionId=" + msg.getTransactionId());
                 if (msgSentMap.containsKey(msg.getMsgId())) {
                     receivedMsgCount.incrementAndGet();
                 }
@@ -213,11 +206,10 @@ public class TransactionMessageIT extends 
ContainerIntegrationTestBase {
         producer.start();
 
         for (int i = 0; i < MESSAGE_COUNT; i++) {
-            Message msg = new Message(topic, MESSAGE_BODY);
+            Message msg = new Message(topic, messageBody);
             msg.setKeys(UUID.randomUUID().toString());
             SendResult result = producer.sendMessageInTransaction(msg, null);
             String msgId = result.getMsgId();
-            System.out.println("Sent trans msgid=" + msgId + ", 
transactionId=" + result.getTransactionId() + ", key=" + msg.getKeys());
 
             msgSentMap.put(msgId, msg);
         }
@@ -226,8 +218,8 @@ public class TransactionMessageIT extends 
ContainerIntegrationTestBase {
         brokerContainer1.removeBroker(new 
BrokerIdentity(master1With3Replicas.getBrokerIdentity().getBrokerClusterName(),
             master1With3Replicas.getBrokerIdentity().getBrokerName(),
             master1With3Replicas.getBrokerIdentity().getBrokerId()));
-        System.out.println("=========" + 
master1With3Replicas.getBrokerIdentity().getBrokerName() + "-"
-            + master1With3Replicas.getBrokerIdentity().getBrokerId() + " 
removed");
+        System.out.printf("=========" + 
master1With3Replicas.getBrokerIdentity().getBrokerName() + "-"
+            + master1With3Replicas.getBrokerIdentity().getBrokerId() + " 
removed%n");
 
         createTopicTo(master2With3Replicas, topic, 1, 1);
         createTopicTo(master3With3Replicas, topic, 1, 1);
@@ -235,9 +227,9 @@ public class TransactionMessageIT extends 
ContainerIntegrationTestBase {
         brokerContainer2.removeBroker(new 
BrokerIdentity(master2With3Replicas.getBrokerIdentity().getBrokerClusterName(),
             master2With3Replicas.getBrokerIdentity().getBrokerName(),
             master2With3Replicas.getBrokerIdentity().getBrokerId()));
-        System.out.println("=========" + 
master2With3Replicas.getBrokerIdentity().getBrokerClusterName() + "-"
+        System.out.printf("=========" + 
master2With3Replicas.getBrokerIdentity().getBrokerClusterName() + "-"
             + master2With3Replicas.getBrokerIdentity().getBrokerName()
-            + "-" + master2With3Replicas.getBrokerIdentity().getBrokerId() + " 
removed");
+            + "-" + master2With3Replicas.getBrokerIdentity().getBrokerId() + " 
removed%n");
 
         
pushConsumer.getDefaultMQPushConsumerImpl().getRebalanceImpl().doRebalance(false);
         transactionCheckListener.setShouldReturnUnknownState(false);
@@ -268,7 +260,6 @@ public class TransactionMessageIT extends 
ContainerIntegrationTestBase {
         pushConsumer2.subscribe(topic, "*");
         pushConsumer2.registerMessageListener((MessageListenerConcurrently) 
(msgs, context) -> {
             for (MessageExt msg : msgs) {
-                System.out.println("[After master recovered] receive trans 
msgId=" + msg.getMsgId() + ", transactionId=" + msg.getTransactionId());
                 if (msgSentMap.containsKey(msg.getMsgId())) {
                     receivedMsgCount.incrementAndGet();
                 }
@@ -277,7 +268,7 @@ public class TransactionMessageIT extends 
ContainerIntegrationTestBase {
             return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
         });
         pushConsumer2.start();
-        System.out.println("Wait for checking...");
+        System.out.printf("Wait for checking...%n");
         Thread.sleep(10000L);
         assertThat(receivedMsgCount.get()).isEqualTo(0);
         pushConsumer2.shutdown();

Reply via email to