This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f3ac2e6ba6c [improve][test] Optimize TransactionEndToEndTest (#18522)
f3ac2e6ba6c is described below

commit f3ac2e6ba6cbbb5731246a6f4f16de0126a4304d
Author: Xiangying Meng <[email protected]>
AuthorDate: Thu Nov 24 11:14:46 2022 +0800

    [improve][test] Optimize TransactionEndToEndTest (#18522)
    
    ## Motivation
    1. fix flaky test https://github.com/apache/pulsar/issues/18466 caused by 
txn async send method
    2. decrease run time by optimizing receive method
    ## Modification
    1. fix flaky test
       * modify `producer.newMessage(txn1).value(("Hello Txn - " + 
i).getBytes(UTF_8)).sendAsync();` to `producer.newMessage(txn1).value(("Hello 
Txn - " + i).getBytes(UTF_8)).send();`
    This also can be resolved by https://github.com/apache/pulsar/pull/17836 
and https://github.com/apache/pulsar/pull/18486 later.
    2. decrease run time by optimizing receive method
        * modify
     `    Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
                       Assert.assertNull(message);` to
                       `  Message<byte[]> message = consumer.receive(300, 
TimeUnit.MILLISECONDS);
                                Assert.assertNull(message);`
       * modify `message = consumer.receive();` to `message = 
consumer.receive(5, TimeUnit.SECONDS);`
       * keep other `consumer.receive(x, y)` no change.
---
 .../client/impl/TransactionEndToEndTest.java       | 75 +++++++++++-----------
 1 file changed, 36 insertions(+), 39 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index af18c2ed5c1..663c1c50ce7 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -146,15 +146,15 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
         int messageCnt = 1000;
         for (int i = 0; i < messageCnt; i++) {
             if (i % 5 == 0) {
-                producer.newMessage(txn1).value(("Hello Txn - " + 
i).getBytes(UTF_8)).sendAsync();
+                producer.newMessage(txn1).value(("Hello Txn - " + 
i).getBytes(UTF_8)).send();
             } else {
-                producer.newMessage(txn2).value(("Hello Txn - " + 
i).getBytes(UTF_8)).sendAsync();
+                producer.newMessage(txn2).value(("Hello Txn - " + 
i).getBytes(UTF_8)).send();
             }
             txnMessageCnt++;
         }
 
         // Can't receive transaction messages before commit.
-        Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
+        Message<byte[]> message = consumer.receive(300, TimeUnit.MILLISECONDS);
         Assert.assertNull(message);
 
         txn1.commit().get();
@@ -162,16 +162,13 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
 
         int receiveCnt = 0;
         for (int i = 0; i < txnMessageCnt; i++) {
-            message = consumer.receive();
+            message = consumer.receive(5, TimeUnit.SECONDS);
             Assert.assertNotNull(message);
             receiveCnt ++;
         }
         Assert.assertEquals(txnMessageCnt, receiveCnt);
 
-        message = consumer.receive(5, TimeUnit.SECONDS);
-        Assert.assertNull(message);
-
-        message = consumer.receive(5, TimeUnit.SECONDS);
+        message = consumer.receive(300, TimeUnit.MILLISECONDS);
         Assert.assertNull(message);
 
         // cleanup.
@@ -208,13 +205,13 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
         Awaitility.await().until(consumer::isConnected);
 
         // Can't receive transaction messages before abort.
-        Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS);
+        Message<byte[]> message = consumer.receive(300, TimeUnit.MILLISECONDS);
         Assert.assertNull(message);
 
         txn.abort().get();
 
         // Cant't receive transaction messages after abort.
-        message = consumer.receive(2, TimeUnit.SECONDS);
+        message = consumer.receive(300, TimeUnit.MILLISECONDS);
         Assert.assertNull(message);
         Awaitility.await().until(() -> {
             boolean flag = true;
@@ -302,7 +299,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
         Transaction txn = getTxn();
 
         for (int i = 0; i < messageCount / 2; i++) {
-            Message<byte[]> message = consumer.receive();
+            Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
             consumer.acknowledgeAsync(message.getMessageId(), txn).get();
         }
 
@@ -382,14 +379,14 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
 
             // consume and ack messages with txn
             for (int i = 0; i < messageCnt; i++) {
-                Message<byte[]> message = consumer.receive();
+                Message<byte[]> message = consumer.receive(5, 
TimeUnit.SECONDS);
                 Assert.assertNotNull(message);
                 log.info("receive msgId: {}, count : {}", 
message.getMessageId(), i);
                 consumer.acknowledgeAsync(message.getMessageId(), txn).get();
             }
 
             // the messages are pending ack state and can't be received
-            Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS);
+            Message<byte[]> message = consumer.receive(300, 
TimeUnit.MILLISECONDS);
             Assert.assertNull(message);
 
             // 1) txn abort
@@ -408,7 +405,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
             commitTxn.commit().get();
 
             // after transaction commit, the messages can't be received
-            message = consumer.receive(2, TimeUnit.SECONDS);
+            message = consumer.receive(300, TimeUnit.MILLISECONDS);
             Assert.assertNull(message);
 
             Field field = TransactionImpl.class.getDeclaredField("state");
@@ -445,7 +442,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
                 .topic(topicTwo).subscriptionName(sub).subscribe();
         String content = "test";
         producer.send(content);
-        assertEquals(consumer.receive().getValue(), content);
+        assertEquals(consumer.receive(3, TimeUnit.SECONDS).getValue(), 
content);
 
         // cleanup.
         producer.close();
@@ -484,7 +481,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
         log.info("produce transaction messages finished");
 
         // Can't receive transaction messages before commit.
-        Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS);
+        Message<byte[]> message = consumer.receive(300, TimeUnit.MILLISECONDS);
         Assert.assertNull(message);
         log.info("transaction messages can't be received before transaction 
committed");
 
@@ -493,7 +490,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
         int ackedMessageCount = 0;
         int receiveCnt = 0;
         for (int i = 0; i < messageCnt; i++) {
-            message = consumer.receive();
+            message = consumer.receive(5, TimeUnit.SECONDS);
             Assert.assertNotNull(message);
             receiveCnt ++;
             if (i % 2 == 0) {
@@ -503,7 +500,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
         }
         Assert.assertEquals(messageCnt, receiveCnt);
 
-        message = consumer.receive(2, TimeUnit.SECONDS);
+        message = consumer.receive(300, TimeUnit.MILLISECONDS);
         Assert.assertNull(message);
 
         String checkTopic = TopicName.get(topic).getPartition(0).toString();
@@ -522,7 +519,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
         }
         Assert.assertEquals(messageCnt - ackedMessageCount, receiveCnt);
 
-        message = consumer.receive(2, TimeUnit.SECONDS);
+        message = consumer.receive(300, TimeUnit.MILLISECONDS);
         Assert.assertNull(message);
 
         topic = TopicName.get(topic).getPartition(0).toString();
@@ -638,7 +635,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
             }
 
             // the messages are pending ack state and can't be received
-            message = consumer.receive(2, TimeUnit.SECONDS);
+            message = consumer.receive(300, TimeUnit.MILLISECONDS);
             Assert.assertNull(message);
 
             abortTxn.abort().get();
@@ -667,7 +664,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
                 Assert.assertTrue(reCommitError.getCause() instanceof 
TransactionNotFoundException);
             }
 
-            message = consumer.receive(1, TimeUnit.SECONDS);
+            message = consumer.receive(300, TimeUnit.MILLISECONDS);
             Assert.assertNull(message);
         }
 
@@ -729,7 +726,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
         Awaitility.await().until(consumer::isConnected);
 
         for (int i = 0; i < txnCnt * messageCnt; i++) {
-            Message<byte[]> message = consumer.receive();
+            Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
             Assert.assertNotNull(message);
         }
 
@@ -938,14 +935,14 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
                 .withTransactionTimeout(3, TimeUnit.SECONDS)
                 .build().get();
 
-        Message<String> message = consumer.receive();
+        Message<String> message = consumer.receive(5, TimeUnit.SECONDS);
 
         consumer.acknowledgeAsync(message.getMessageId(), 
consumeTimeoutTxn).get();
 
-        Message<String> reReceiveMessage = consumer.receive(2, 
TimeUnit.SECONDS);
+        Message<String> reReceiveMessage = consumer.receive(300, 
TimeUnit.MILLISECONDS);
         assertNull(reReceiveMessage);
 
-        reReceiveMessage = consumer.receive(2, TimeUnit.SECONDS);
+        reReceiveMessage = consumer.receive(5, TimeUnit.SECONDS);
 
         assertEquals(reReceiveMessage.getValue(), message.getValue());
 
@@ -992,9 +989,9 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
         }
         Transaction txn = getTxn();
         if (ackType == CommandAck.AckType.Individual) {
-            consumer.acknowledgeAsync(consumer.receive().getMessageId(), txn);
+            consumer.acknowledgeAsync(consumer.receive(5, 
TimeUnit.SECONDS).getMessageId(), txn);
         } else {
-            
consumer.acknowledgeCumulativeAsync(consumer.receive().getMessageId(), txn);
+            consumer.acknowledgeCumulativeAsync(consumer.receive(5, 
TimeUnit.SECONDS).getMessageId(), txn);
         }
         topic = TopicName.get(topic).toString();
         boolean exist = false;
@@ -1117,7 +1114,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
                     .InvalidTxnStatusException);
         }
         try {
-            Message<String> message = consumer.receive();
+            Message<String> message = consumer.receive(5, TimeUnit.SECONDS);
             consumer.acknowledgeAsync(message.getMessageId(), 
transaction).get();
             Assert.fail();
         } catch (Exception e) {
@@ -1159,7 +1156,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
 
         Message<byte[]> message = null;
         for (int i = 0; i < transactionCumulativeAck; i++) {
-            message = consumer.receive();
+            message = consumer.receive(5, TimeUnit.SECONDS);
         }
 
         // receive transaction in order
@@ -1182,7 +1179,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
 
         // receive the rest of the message
         for (int i = 0; i < count; i++) {
-            message = consumer.receive();
+            message = consumer.receive(5, TimeUnit.SECONDS);
         }
 
         Transaction commitTransaction = getTxn();
@@ -1195,7 +1192,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
         commitTransaction.commit().get();
 
         // then redeliver will not receive any message
-        message = consumer.receive(3, TimeUnit.SECONDS);
+        message = consumer.receive(300, TimeUnit.MILLISECONDS);
         assertNull(message);
 
         // cleanup.
@@ -1270,7 +1267,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
 
         // receive the batch messages add to a list
         for (int i = 0; i < 5; i++) {
-            messageIds.add(consumer.receive().getMessageId());
+            messageIds.add(consumer.receive(5, 
TimeUnit.SECONDS).getMessageId());
         }
 
         MessageIdImpl messageId = (MessageIdImpl) messageIds.get(0);
@@ -1330,7 +1327,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
                 .build().get();
 
         // consumer receive the message the first time, redeliverCount = 0
-        consumer.acknowledgeAsync(consumer.receive().getMessageId(), 
transaction).get();
+        consumer.acknowledgeAsync(consumer.receive(5, 
TimeUnit.SECONDS).getMessageId(), transaction).get();
 
         transaction.abort().get();
 
@@ -1338,13 +1335,13 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
                 .build().get();
 
         // consumer receive the message the second time, redeliverCount = 1, 
also can be received
-        consumer.acknowledgeAsync(consumer.receive().getMessageId(), 
transaction).get();
+        consumer.acknowledgeAsync(consumer.receive(5, 
TimeUnit.SECONDS).getMessageId(), transaction).get();
 
         transaction.abort().get();
 
         // consumer receive the message the third time, redeliverCount = 2,
         // the message will be sent to DLQ, can't receive
-        assertNull(consumer.receive(3, TimeUnit.SECONDS));
+        assertNull(consumer.receive(300, TimeUnit.MILLISECONDS));
 
         assertEquals(((ConsumerImpl<?>) consumer).getAvailablePermits(), 3);
 
@@ -1394,7 +1391,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
         Transaction transaction = 
pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES)
                 .build().get();
 
-        Message<byte[]> message = consumer.receive();
+        Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
         assertEquals(value1, new String(message.getValue()));
         // consumer receive the batch message one the first time, 
redeliverCount = 0
         consumer.acknowledgeAsync(message.getMessageId(), transaction).get();
@@ -1404,7 +1401,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
         // consumer will receive the batch message two and then receive
         // the message one and message two again, redeliverCount = 1
         for (int i = 0; i < 3; i ++) {
-            message = consumer.receive();
+            message = consumer.receive(5, TimeUnit.SECONDS);
         }
 
         transaction = pulsarClient.newTransaction().withTransactionTimeout(5, 
TimeUnit.MINUTES)
@@ -1418,7 +1415,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
 
         // consumer receive the batch message the third time, redeliverCount = 
2,
         // the message will be sent to DLQ, can't receive
-        assertNull(consumer.receive(3, TimeUnit.SECONDS));
+        assertNull(consumer.receive(300, TimeUnit.MILLISECONDS));
 
         assertEquals(((ConsumerImpl<?>) consumer).getAvailablePermits(), 6);
 
@@ -1473,7 +1470,7 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
 
         // Failover consumer will receive the messages immediately while
         // the shared consumer will get them after the delay
-        Message<String> msg = sharedConsumer.receive(1, TimeUnit.SECONDS);
+        Message<String> msg = sharedConsumer.receive(300, 
TimeUnit.MILLISECONDS);
         assertNull(msg);
 
         for (int i = 0; i < 10; i++) {

Reply via email to