This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new dbb157793b9 [fix][test]Fix flaky test because the too short receive
time (#21273)
dbb157793b9 is described below
commit dbb157793b90e203f01a630fb0f0abafbb337762
Author: Xiangying Meng <[email protected]>
AuthorDate: Sat Oct 7 18:01:02 2023 +0800
[fix][test]Fix flaky test because the too short receive time (#21273)
---
.../client/impl/TransactionEndToEndTest.java | 104 +++++++++++----------
1 file changed, 53 insertions(+), 51 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index 348fb04b7dd..da8492e612f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -96,6 +96,8 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
protected static final String TOPIC_OUTPUT = NAMESPACE1 + "/output";
protected static final String TOPIC_MESSAGE_ACK_TEST = NAMESPACE1 +
"/message-ack-test";
protected static final int NUM_PARTITIONS = 16;
+ private static final int waitTimeForCanReceiveMsgInSec = 5;
+ private static final int waitTimeForCannotReceiveMsgInSec = 5;
@BeforeClass
protected void setup() throws Exception {
conf.setAcknowledgmentAtBatchIndexLevelEnabled(true);
@@ -173,7 +175,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
}
// can't receive message anymore
- assertNull(consumer.receive(2, TimeUnit.SECONDS));
+ assertNull(consumer.receive(waitTimeForCannotReceiveMsgInSec,
TimeUnit.SECONDS));
}
@@ -240,14 +242,14 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
.enableBatchIndexAcknowledgment(true)
.subscribe();
- Message<Integer> message = consumer.receive(3, TimeUnit.SECONDS);
+ Message<Integer> message =
consumer.receive(waitTimeForCannotReceiveMsgInSec, TimeUnit.SECONDS);
Assert.assertNull(message);
// abort txn1
txn1.abort().get();
// after txn1 aborted, consumer will receive messages txn1 contains
int receiveCounter = 0;
- while((message = consumer.receive(3, TimeUnit.SECONDS)) != null) {
+ while((message = consumer.receive(waitTimeForCanReceiveMsgInSec,
TimeUnit.SECONDS)) != null) {
Assert.assertEquals(message.getValue().intValue(), receiveCounter);
receiveCounter ++;
}
@@ -347,7 +349,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
}
// Can't receive transaction messages before commit.
- Message<byte[]> message = consumer.receive(300, TimeUnit.MILLISECONDS);
+ Message<byte[]> message =
consumer.receive(waitTimeForCannotReceiveMsgInSec, TimeUnit.SECONDS);
Assert.assertNull(message);
txn1.commit().get();
@@ -355,13 +357,13 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
int receiveCnt = 0;
for (int i = 0; i < txnMessageCnt; i++) {
- message = consumer.receive(5, TimeUnit.SECONDS);
+ message = consumer.receive(waitTimeForCanReceiveMsgInSec,
TimeUnit.SECONDS);
Assert.assertNotNull(message);
receiveCnt ++;
}
Assert.assertEquals(txnMessageCnt, receiveCnt);
- message = consumer.receive(300, TimeUnit.MILLISECONDS);
+ message = consumer.receive(waitTimeForCannotReceiveMsgInSec,
TimeUnit.SECONDS);
Assert.assertNull(message);
// cleanup.
@@ -398,13 +400,13 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
Awaitility.await().until(consumer::isConnected);
// Can't receive transaction messages before abort.
- Message<byte[]> message = consumer.receive(300, TimeUnit.MILLISECONDS);
+ Message<byte[]> message =
consumer.receive(waitTimeForCannotReceiveMsgInSec, TimeUnit.SECONDS);
Assert.assertNull(message);
txn.abort().get();
// Cant't receive transaction messages after abort.
- message = consumer.receive(300, TimeUnit.MILLISECONDS);
+ message = consumer.receive(waitTimeForCannotReceiveMsgInSec,
TimeUnit.SECONDS);
Assert.assertNull(message);
Awaitility.await().until(() -> {
boolean flag = true;
@@ -492,7 +494,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
Transaction txn = getTxn();
for (int i = 0; i < messageCount / 2; i++) {
- Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
+ Message<byte[]> message =
consumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS);
consumer.acknowledgeAsync(message.getMessageId(), txn).get();
}
@@ -572,14 +574,14 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
// consume and ack messages with txn
for (int i = 0; i < messageCnt; i++) {
- Message<byte[]> message = consumer.receive(5,
TimeUnit.SECONDS);
+ Message<byte[]> message =
consumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS);
Assert.assertNotNull(message);
log.info("receive msgId: {}, count : {}",
message.getMessageId(), i);
consumer.acknowledgeAsync(message.getMessageId(), txn).get();
}
// the messages are pending ack state and can't be received
- Message<byte[]> message = consumer.receive(300,
TimeUnit.MILLISECONDS);
+ Message<byte[]> message =
consumer.receive(waitTimeForCannotReceiveMsgInSec, TimeUnit.SECONDS);
Assert.assertNull(message);
// 1) txn abort
@@ -588,7 +590,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
// after transaction abort, the messages could be received
Transaction commitTxn = getTxn();
for (int i = 0; i < messageCnt; i++) {
- message = consumer.receive(2, TimeUnit.SECONDS);
+ message = consumer.receive(waitTimeForCanReceiveMsgInSec,
TimeUnit.SECONDS);
Assert.assertNotNull(message);
consumer.acknowledgeAsync(message.getMessageId(),
commitTxn).get();
log.info("receive msgId: {}, count: {}",
message.getMessageId(), i);
@@ -598,7 +600,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
commitTxn.commit().get();
// after transaction commit, the messages can't be received
- message = consumer.receive(300, TimeUnit.MILLISECONDS);
+ message = consumer.receive(waitTimeForCannotReceiveMsgInSec,
TimeUnit.SECONDS);
Assert.assertNull(message);
Field field = TransactionImpl.class.getDeclaredField("state");
@@ -635,7 +637,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
.topic(topicTwo).subscriptionName(sub).subscribe();
String content = "test";
producer.send(content);
- assertEquals(consumer.receive(3, TimeUnit.SECONDS).getValue(),
content);
+ assertEquals(consumer.receive(waitTimeForCanReceiveMsgInSec,
TimeUnit.SECONDS).getValue(), content);
// cleanup.
producer.close();
@@ -674,7 +676,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
log.info("produce transaction messages finished");
// Can't receive transaction messages before commit.
- Message<byte[]> message = consumer.receive(300, TimeUnit.MILLISECONDS);
+ Message<byte[]> message =
consumer.receive(waitTimeForCannotReceiveMsgInSec, TimeUnit.SECONDS);
Assert.assertNull(message);
log.info("transaction messages can't be received before transaction
committed");
@@ -683,7 +685,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
int ackedMessageCount = 0;
int receiveCnt = 0;
for (int i = 0; i < messageCnt; i++) {
- message = consumer.receive(5, TimeUnit.SECONDS);
+ message = consumer.receive(waitTimeForCanReceiveMsgInSec,
TimeUnit.SECONDS);
Assert.assertNotNull(message);
receiveCnt ++;
if (i % 2 == 0) {
@@ -693,7 +695,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
}
Assert.assertEquals(messageCnt, receiveCnt);
- message = consumer.receive(300, TimeUnit.MILLISECONDS);
+ message = consumer.receive(waitTimeForCannotReceiveMsgInSec,
TimeUnit.SECONDS);
Assert.assertNull(message);
String checkTopic = TopicName.get(topic).getPartition(0).toString();
@@ -705,14 +707,14 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
receiveCnt = 0;
for (int i = 0; i < messageCnt - ackedMessageCount; i++) {
- message = consumer.receive(2, TimeUnit.SECONDS);
+ message = consumer.receive(waitTimeForCanReceiveMsgInSec,
TimeUnit.SECONDS);
Assert.assertNotNull(message);
consumer.acknowledge(message);
receiveCnt ++;
}
Assert.assertEquals(messageCnt - ackedMessageCount, receiveCnt);
- message = consumer.receive(300, TimeUnit.MILLISECONDS);
+ message = consumer.receive(waitTimeForCannotReceiveMsgInSec,
TimeUnit.SECONDS);
Assert.assertNull(message);
topic = TopicName.get(topic).getPartition(0).toString();
@@ -803,7 +805,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
Message<byte[]> message = null;
Thread.sleep(1000L);
for (int i = 0; i < messageCnt; i++) {
- message = consumer.receive(1, TimeUnit.SECONDS);
+ message = consumer.receive(waitTimeForCanReceiveMsgInSec,
TimeUnit.SECONDS);
Assert.assertNotNull(message);
if (i % 3 == 0) {
consumer.acknowledgeCumulativeAsync(message.getMessageId(), abortTxn).get();
@@ -828,14 +830,14 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
}
// the messages are pending ack state and can't be received
- message = consumer.receive(300, TimeUnit.MILLISECONDS);
+ message = consumer.receive(waitTimeForCannotReceiveMsgInSec,
TimeUnit.SECONDS);
Assert.assertNull(message);
abortTxn.abort().get();
consumer.redeliverUnacknowledgedMessages();
Transaction commitTxn = getTxn();
for (int i = 0; i < messageCnt; i++) {
- message = consumer.receive(1, TimeUnit.SECONDS);
+ message = consumer.receive(waitTimeForCanReceiveMsgInSec,
TimeUnit.SECONDS);
Assert.assertNotNull(message);
if (i % 3 == 0) {
consumer.acknowledgeCumulativeAsync(message.getMessageId(), commitTxn).get();
@@ -857,7 +859,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
Assert.assertTrue(reCommitError.getCause() instanceof
TransactionNotFoundException);
}
- message = consumer.receive(300, TimeUnit.MILLISECONDS);
+ message = consumer.receive(waitTimeForCannotReceiveMsgInSec,
TimeUnit.SECONDS);
Assert.assertNull(message);
}
@@ -919,7 +921,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
Awaitility.await().until(consumer::isConnected);
for (int i = 0; i < txnCnt * messageCnt; i++) {
- Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
+ Message<byte[]> message =
consumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS);
Assert.assertNotNull(message);
}
@@ -960,7 +962,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
txn.commit().get();
for (int i = 0; i < 1000; i++) {
- Message<byte[]> message = consumer.receive(5,
TimeUnit.SECONDS);
+ Message<byte[]> message =
consumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS);
Assert.assertNotNull(message);
Assert.assertEquals(Integer.valueOf(new
String(message.getData())), Integer.valueOf(i));
}
@@ -1016,7 +1018,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
}
- Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
+ Message<byte[]> message =
consumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS);
consumer.acknowledgeAsync(message.getMessageId(), consumeTxn).get();
consumeTxn.commit().get();
try {
@@ -1062,7 +1064,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
constructor.setAccessible(true);
TransactionImpl timeoutTxnSkipClientTimeout =
constructor.newInstance(pulsarClient, 5,
- timeoutTxn.getTxnID().getLeastSigBits(),
timeoutTxn.getTxnID().getMostSigBits());
+ timeoutTxn.getTxnID().getLeastSigBits(),
timeoutTxn.getTxnID().getMostSigBits());
try {
timeoutTxnSkipClientTimeout.commit().get();
@@ -1092,7 +1094,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
.newTransaction(new TransactionCoordinatorID(0), 1,
null).get();
Awaitility.await().until(() -> {
try {
-
getPulsarServiceList().get(0).getTransactionMetadataStoreService().getTxnMeta(txnID).get();
+
getPulsarServiceList().get(0).getTransactionMetadataStoreService().getTxnMeta(txnID).get();
return false;
} catch (Exception e) {
return true;
@@ -1125,17 +1127,17 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
Transaction consumeTimeoutTxn = pulsarClient
.newTransaction()
- .withTransactionTimeout(3, TimeUnit.SECONDS)
+ .withTransactionTimeout(7, TimeUnit.SECONDS)
.build().get();
- Message<String> message = consumer.receive(5, TimeUnit.SECONDS);
+ Message<String> message =
consumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS);
consumer.acknowledgeAsync(message.getMessageId(),
consumeTimeoutTxn).get();
- Message<String> reReceiveMessage = consumer.receive(300,
TimeUnit.MILLISECONDS);
+ Message<String> reReceiveMessage =
consumer.receive(waitTimeForCannotReceiveMsgInSec, TimeUnit.SECONDS);
assertNull(reReceiveMessage);
- reReceiveMessage = consumer.receive(5, TimeUnit.SECONDS);
+ reReceiveMessage = consumer.receive(waitTimeForCanReceiveMsgInSec,
TimeUnit.SECONDS);
assertEquals(reReceiveMessage.getValue(), message.getValue());
@@ -1182,9 +1184,9 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
}
Transaction txn = getTxn();
if (ackType == CommandAck.AckType.Individual) {
- consumer.acknowledgeAsync(consumer.receive(5,
TimeUnit.SECONDS).getMessageId(), txn);
+
consumer.acknowledgeAsync(consumer.receive(waitTimeForCanReceiveMsgInSec,
TimeUnit.SECONDS).getMessageId(), txn);
} else {
- consumer.acknowledgeCumulativeAsync(consumer.receive(5,
TimeUnit.SECONDS).getMessageId(), txn);
+
consumer.acknowledgeCumulativeAsync(consumer.receive(waitTimeForCanReceiveMsgInSec,
TimeUnit.SECONDS).getMessageId(), txn);
}
topic = TopicName.get(topic).toString();
boolean exist = false;
@@ -1307,7 +1309,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
.InvalidTxnStatusException);
}
try {
- Message<String> message = consumer.receive(5, TimeUnit.SECONDS);
+ Message<String> message =
consumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS);
consumer.acknowledgeAsync(message.getMessageId(),
transaction).get();
Assert.fail();
} catch (Exception e) {
@@ -1349,7 +1351,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
Message<byte[]> message = null;
for (int i = 0; i < transactionCumulativeAck; i++) {
- message = consumer.receive(5, TimeUnit.SECONDS);
+ message = consumer.receive(waitTimeForCanReceiveMsgInSec,
TimeUnit.SECONDS);
}
// receive transaction in order
@@ -1372,7 +1374,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
// receive the rest of the message
for (int i = 0; i < count; i++) {
- message = consumer.receive(5, TimeUnit.SECONDS);
+ message = consumer.receive(waitTimeForCanReceiveMsgInSec,
TimeUnit.SECONDS);
}
Transaction commitTransaction = getTxn();
@@ -1385,7 +1387,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
commitTransaction.commit().get();
// then redeliver will not receive any message
- message = consumer.receive(300, TimeUnit.MILLISECONDS);
+ message = consumer.receive(waitTimeForCannotReceiveMsgInSec,
TimeUnit.SECONDS);
assertNull(message);
// cleanup.
@@ -1460,7 +1462,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
// receive the batch messages add to a list
for (int i = 0; i < 5; i++) {
- messageIds.add(consumer.receive(5,
TimeUnit.SECONDS).getMessageId());
+ messageIds.add(consumer.receive(waitTimeForCanReceiveMsgInSec,
TimeUnit.SECONDS).getMessageId());
}
MessageIdImpl messageId = (MessageIdImpl) messageIds.get(0);
@@ -1520,7 +1522,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
.build().get();
// consumer receive the message the first time, redeliverCount = 0
- consumer.acknowledgeAsync(consumer.receive(5,
TimeUnit.SECONDS).getMessageId(), transaction).get();
+
consumer.acknowledgeAsync(consumer.receive(waitTimeForCanReceiveMsgInSec,
TimeUnit.SECONDS).getMessageId(), transaction).get();
transaction.abort().get();
@@ -1528,17 +1530,17 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
.build().get();
// consumer receive the message the second time, redeliverCount = 1,
also can be received
- consumer.acknowledgeAsync(consumer.receive(5,
TimeUnit.SECONDS).getMessageId(), transaction).get();
+
consumer.acknowledgeAsync(consumer.receive(waitTimeForCanReceiveMsgInSec,
TimeUnit.SECONDS).getMessageId(), transaction).get();
transaction.abort().get();
// consumer receive the message the third time, redeliverCount = 2,
// the message will be sent to DLQ, can't receive
- assertNull(consumer.receive(300, TimeUnit.MILLISECONDS));
+ assertNull(consumer.receive(waitTimeForCannotReceiveMsgInSec,
TimeUnit.SECONDS));
assertEquals(((ConsumerImpl<?>) consumer).getAvailablePermits(), 3);
- assertEquals(value, new String(deadLetterConsumer.receive(3,
TimeUnit.SECONDS).getValue()));
+ assertEquals(value, new
String(deadLetterConsumer.receive(waitTimeForCanReceiveMsgInSec,
TimeUnit.SECONDS).getValue()));
// cleanup.
consumer.close();
@@ -1584,7 +1586,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
Transaction transaction =
pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES)
.build().get();
- Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
+ Message<byte[]> message =
consumer.receive(waitTimeForCanReceiveMsgInSec, TimeUnit.SECONDS);
assertEquals(value1, new String(message.getValue()));
// consumer receive the batch message one the first time,
redeliverCount = 0
consumer.acknowledgeAsync(message.getMessageId(), transaction).get();
@@ -1594,7 +1596,7 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
// consumer will receive the batch message two and then receive
// the message one and message two again, redeliverCount = 1
for (int i = 0; i < 3; i ++) {
- message = consumer.receive(5, TimeUnit.SECONDS);
+ message = consumer.receive(waitTimeForCanReceiveMsgInSec,
TimeUnit.SECONDS);
}
transaction = pulsarClient.newTransaction().withTransactionTimeout(5,
TimeUnit.MINUTES)
@@ -1608,12 +1610,12 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
// consumer receive the batch message the third time, redeliverCount =
2,
// the message will be sent to DLQ, can't receive
- assertNull(consumer.receive(300, TimeUnit.MILLISECONDS));
+ assertNull(consumer.receive(waitTimeForCannotReceiveMsgInSec,
TimeUnit.SECONDS));
assertEquals(((ConsumerImpl<?>) consumer).getAvailablePermits(), 6);
- assertEquals(value1, new String(deadLetterConsumer.receive(3,
TimeUnit.SECONDS).getValue()));
- assertEquals(value2, new String(deadLetterConsumer.receive(3,
TimeUnit.SECONDS).getValue()));
+ assertEquals(value1, new
String(deadLetterConsumer.receive(waitTimeForCanReceiveMsgInSec,
TimeUnit.SECONDS).getValue()));
+ assertEquals(value2, new
String(deadLetterConsumer.receive(waitTimeForCanReceiveMsgInSec,
TimeUnit.SECONDS).getValue()));
// cleanup.
consumer.close();
@@ -1663,17 +1665,17 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
// Failover consumer will receive the messages immediately while
// the shared consumer will get them after the delay
- Message<String> msg = sharedConsumer.receive(300,
TimeUnit.MILLISECONDS);
+ Message<String> msg =
sharedConsumer.receive(waitTimeForCannotReceiveMsgInSec, TimeUnit.SECONDS);
assertNull(msg);
for (int i = 0; i < 10; i++) {
- msg = failoverConsumer.receive(100, TimeUnit.MILLISECONDS);
+ msg = failoverConsumer.receive(waitTimeForCanReceiveMsgInSec,
TimeUnit.SECONDS);
assertEquals(msg.getValue(), "msg-" + i);
}
Set<String> receivedMsgs = new TreeSet<>();
for (int i = 0; i < 10; i++) {
- msg = sharedConsumer.receive(10, TimeUnit.SECONDS);
+ msg = sharedConsumer.receive(waitTimeForCanReceiveMsgInSec,
TimeUnit.SECONDS);
receivedMsgs.add(msg.getValue());
}