This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f3ac2e6ba6c [improve][test] Optimize TransactionEndToEndTest (#18522)
f3ac2e6ba6c is described below
commit f3ac2e6ba6cbbb5731246a6f4f16de0126a4304d
Author: Xiangying Meng <[email protected]>
AuthorDate: Thu Nov 24 11:14:46 2022 +0800
[improve][test] Optimize TransactionEndToEndTest (#18522)
## Motivation
1. fix flaky test https://github.com/apache/pulsar/issues/18466 caused by
txn async send method
2. decrease run time by optimizing receive method
## Modification
1. fix flaky test
* modify `producer.newMessage(txn1).value(("Hello Txn - " +
i).getBytes(UTF_8)).sendAsync();` to `producer.newMessage(txn1).value(("Hello
Txn - " + i).getBytes(UTF_8)).send();`
This also can be resolved by https://github.com/apache/pulsar/pull/17836
and https://github.com/apache/pulsar/pull/18486 later.
2. decrease run time by optimizing receive method
* modify
` Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
Assert.assertNull(message);` to
` Message<byte[]> message = consumer.receive(300,
TimeUnit.MILLISECONDS);
Assert.assertNull(message);`
* modify `message = consumer.receive();` to `message =
consumer.receive(5, TimeUnit.SECONDS);`
* keep other `consumer.receive(x, y)` no change.
---
.../client/impl/TransactionEndToEndTest.java | 75 +++++++++++-----------
1 file changed, 36 insertions(+), 39 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index af18c2ed5c1..663c1c50ce7 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -146,15 +146,15 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
int messageCnt = 1000;
for (int i = 0; i < messageCnt; i++) {
if (i % 5 == 0) {
- producer.newMessage(txn1).value(("Hello Txn - " +
i).getBytes(UTF_8)).sendAsync();
+ producer.newMessage(txn1).value(("Hello Txn - " +
i).getBytes(UTF_8)).send();
} else {
- producer.newMessage(txn2).value(("Hello Txn - " +
i).getBytes(UTF_8)).sendAsync();
+ producer.newMessage(txn2).value(("Hello Txn - " +
i).getBytes(UTF_8)).send();
}
txnMessageCnt++;
}
// Can't receive transaction messages before commit.
- Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
+ Message<byte[]> message = consumer.receive(300, TimeUnit.MILLISECONDS);
Assert.assertNull(message);
txn1.commit().get();
@@ -162,16 +162,13 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
int receiveCnt = 0;
for (int i = 0; i < txnMessageCnt; i++) {
- message = consumer.receive();
+ message = consumer.receive(5, TimeUnit.SECONDS);
Assert.assertNotNull(message);
receiveCnt ++;
}
Assert.assertEquals(txnMessageCnt, receiveCnt);
- message = consumer.receive(5, TimeUnit.SECONDS);
- Assert.assertNull(message);
-
- message = consumer.receive(5, TimeUnit.SECONDS);
+ message = consumer.receive(300, TimeUnit.MILLISECONDS);
Assert.assertNull(message);
// cleanup.
@@ -208,13 +205,13 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
Awaitility.await().until(consumer::isConnected);
// Can't receive transaction messages before abort.
- Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS);
+ Message<byte[]> message = consumer.receive(300, TimeUnit.MILLISECONDS);
Assert.assertNull(message);
txn.abort().get();
// Cant't receive transaction messages after abort.
- message = consumer.receive(2, TimeUnit.SECONDS);
+ message = consumer.receive(300, TimeUnit.MILLISECONDS);
Assert.assertNull(message);
Awaitility.await().until(() -> {
boolean flag = true;
@@ -302,7 +299,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
Transaction txn = getTxn();
for (int i = 0; i < messageCount / 2; i++) {
- Message<byte[]> message = consumer.receive();
+ Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
consumer.acknowledgeAsync(message.getMessageId(), txn).get();
}
@@ -382,14 +379,14 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
// consume and ack messages with txn
for (int i = 0; i < messageCnt; i++) {
- Message<byte[]> message = consumer.receive();
+ Message<byte[]> message = consumer.receive(5,
TimeUnit.SECONDS);
Assert.assertNotNull(message);
log.info("receive msgId: {}, count : {}",
message.getMessageId(), i);
consumer.acknowledgeAsync(message.getMessageId(), txn).get();
}
// the messages are pending ack state and can't be received
- Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS);
+ Message<byte[]> message = consumer.receive(300,
TimeUnit.MILLISECONDS);
Assert.assertNull(message);
// 1) txn abort
@@ -408,7 +405,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
commitTxn.commit().get();
// after transaction commit, the messages can't be received
- message = consumer.receive(2, TimeUnit.SECONDS);
+ message = consumer.receive(300, TimeUnit.MILLISECONDS);
Assert.assertNull(message);
Field field = TransactionImpl.class.getDeclaredField("state");
@@ -445,7 +442,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
.topic(topicTwo).subscriptionName(sub).subscribe();
String content = "test";
producer.send(content);
- assertEquals(consumer.receive().getValue(), content);
+ assertEquals(consumer.receive(3, TimeUnit.SECONDS).getValue(),
content);
// cleanup.
producer.close();
@@ -484,7 +481,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
log.info("produce transaction messages finished");
// Can't receive transaction messages before commit.
- Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS);
+ Message<byte[]> message = consumer.receive(300, TimeUnit.MILLISECONDS);
Assert.assertNull(message);
log.info("transaction messages can't be received before transaction
committed");
@@ -493,7 +490,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
int ackedMessageCount = 0;
int receiveCnt = 0;
for (int i = 0; i < messageCnt; i++) {
- message = consumer.receive();
+ message = consumer.receive(5, TimeUnit.SECONDS);
Assert.assertNotNull(message);
receiveCnt ++;
if (i % 2 == 0) {
@@ -503,7 +500,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
}
Assert.assertEquals(messageCnt, receiveCnt);
- message = consumer.receive(2, TimeUnit.SECONDS);
+ message = consumer.receive(300, TimeUnit.MILLISECONDS);
Assert.assertNull(message);
String checkTopic = TopicName.get(topic).getPartition(0).toString();
@@ -522,7 +519,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
}
Assert.assertEquals(messageCnt - ackedMessageCount, receiveCnt);
- message = consumer.receive(2, TimeUnit.SECONDS);
+ message = consumer.receive(300, TimeUnit.MILLISECONDS);
Assert.assertNull(message);
topic = TopicName.get(topic).getPartition(0).toString();
@@ -638,7 +635,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
}
// the messages are pending ack state and can't be received
- message = consumer.receive(2, TimeUnit.SECONDS);
+ message = consumer.receive(300, TimeUnit.MILLISECONDS);
Assert.assertNull(message);
abortTxn.abort().get();
@@ -667,7 +664,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
Assert.assertTrue(reCommitError.getCause() instanceof
TransactionNotFoundException);
}
- message = consumer.receive(1, TimeUnit.SECONDS);
+ message = consumer.receive(300, TimeUnit.MILLISECONDS);
Assert.assertNull(message);
}
@@ -729,7 +726,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
Awaitility.await().until(consumer::isConnected);
for (int i = 0; i < txnCnt * messageCnt; i++) {
- Message<byte[]> message = consumer.receive();
+ Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
Assert.assertNotNull(message);
}
@@ -938,14 +935,14 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
.withTransactionTimeout(3, TimeUnit.SECONDS)
.build().get();
- Message<String> message = consumer.receive();
+ Message<String> message = consumer.receive(5, TimeUnit.SECONDS);
consumer.acknowledgeAsync(message.getMessageId(),
consumeTimeoutTxn).get();
- Message<String> reReceiveMessage = consumer.receive(2,
TimeUnit.SECONDS);
+ Message<String> reReceiveMessage = consumer.receive(300,
TimeUnit.MILLISECONDS);
assertNull(reReceiveMessage);
- reReceiveMessage = consumer.receive(2, TimeUnit.SECONDS);
+ reReceiveMessage = consumer.receive(5, TimeUnit.SECONDS);
assertEquals(reReceiveMessage.getValue(), message.getValue());
@@ -992,9 +989,9 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
}
Transaction txn = getTxn();
if (ackType == CommandAck.AckType.Individual) {
- consumer.acknowledgeAsync(consumer.receive().getMessageId(), txn);
+ consumer.acknowledgeAsync(consumer.receive(5,
TimeUnit.SECONDS).getMessageId(), txn);
} else {
-
consumer.acknowledgeCumulativeAsync(consumer.receive().getMessageId(), txn);
+ consumer.acknowledgeCumulativeAsync(consumer.receive(5,
TimeUnit.SECONDS).getMessageId(), txn);
}
topic = TopicName.get(topic).toString();
boolean exist = false;
@@ -1117,7 +1114,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
.InvalidTxnStatusException);
}
try {
- Message<String> message = consumer.receive();
+ Message<String> message = consumer.receive(5, TimeUnit.SECONDS);
consumer.acknowledgeAsync(message.getMessageId(),
transaction).get();
Assert.fail();
} catch (Exception e) {
@@ -1159,7 +1156,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
Message<byte[]> message = null;
for (int i = 0; i < transactionCumulativeAck; i++) {
- message = consumer.receive();
+ message = consumer.receive(5, TimeUnit.SECONDS);
}
// receive transaction in order
@@ -1182,7 +1179,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
// receive the rest of the message
for (int i = 0; i < count; i++) {
- message = consumer.receive();
+ message = consumer.receive(5, TimeUnit.SECONDS);
}
Transaction commitTransaction = getTxn();
@@ -1195,7 +1192,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
commitTransaction.commit().get();
// then redeliver will not receive any message
- message = consumer.receive(3, TimeUnit.SECONDS);
+ message = consumer.receive(300, TimeUnit.MILLISECONDS);
assertNull(message);
// cleanup.
@@ -1270,7 +1267,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
// receive the batch messages add to a list
for (int i = 0; i < 5; i++) {
- messageIds.add(consumer.receive().getMessageId());
+ messageIds.add(consumer.receive(5,
TimeUnit.SECONDS).getMessageId());
}
MessageIdImpl messageId = (MessageIdImpl) messageIds.get(0);
@@ -1330,7 +1327,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
.build().get();
// consumer receive the message the first time, redeliverCount = 0
- consumer.acknowledgeAsync(consumer.receive().getMessageId(),
transaction).get();
+ consumer.acknowledgeAsync(consumer.receive(5,
TimeUnit.SECONDS).getMessageId(), transaction).get();
transaction.abort().get();
@@ -1338,13 +1335,13 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
.build().get();
// consumer receive the message the second time, redeliverCount = 1,
also can be received
- consumer.acknowledgeAsync(consumer.receive().getMessageId(),
transaction).get();
+ consumer.acknowledgeAsync(consumer.receive(5,
TimeUnit.SECONDS).getMessageId(), transaction).get();
transaction.abort().get();
// consumer receive the message the third time, redeliverCount = 2,
// the message will be sent to DLQ, can't receive
- assertNull(consumer.receive(3, TimeUnit.SECONDS));
+ assertNull(consumer.receive(300, TimeUnit.MILLISECONDS));
assertEquals(((ConsumerImpl<?>) consumer).getAvailablePermits(), 3);
@@ -1394,7 +1391,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
Transaction transaction =
pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES)
.build().get();
- Message<byte[]> message = consumer.receive();
+ Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
assertEquals(value1, new String(message.getValue()));
// consumer receive the batch message one the first time,
redeliverCount = 0
consumer.acknowledgeAsync(message.getMessageId(), transaction).get();
@@ -1404,7 +1401,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
// consumer will receive the batch message two and then receive
// the message one and message two again, redeliverCount = 1
for (int i = 0; i < 3; i ++) {
- message = consumer.receive();
+ message = consumer.receive(5, TimeUnit.SECONDS);
}
transaction = pulsarClient.newTransaction().withTransactionTimeout(5,
TimeUnit.MINUTES)
@@ -1418,7 +1415,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
// consumer receive the batch message the third time, redeliverCount =
2,
// the message will be sent to DLQ, can't receive
- assertNull(consumer.receive(3, TimeUnit.SECONDS));
+ assertNull(consumer.receive(300, TimeUnit.MILLISECONDS));
assertEquals(((ConsumerImpl<?>) consumer).getAvailablePermits(), 6);
@@ -1473,7 +1470,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
// Failover consumer will receive the messages immediately while
// the shared consumer will get them after the delay
- Message<String> msg = sharedConsumer.receive(1, TimeUnit.SECONDS);
+ Message<String> msg = sharedConsumer.receive(300,
TimeUnit.MILLISECONDS);
assertNull(msg);
for (int i = 0; i < 10; i++) {