This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 58c82a71beb Revert #15524: do not redeliver message when txn is not
opened (#15593)
58c82a71beb is described below
commit 58c82a71beb7506e422def391af532945be2b7a7
Author: Xiangying Meng <[email protected]>
AuthorDate: Sat May 14 17:47:25 2022 +0800
Revert #15524: do not redeliver message when txn is not opened (#15593)
---
.../client/impl/TransactionEndToEndTest.java | 56 +---------------------
.../apache/pulsar/client/impl/ConsumerBase.java | 1 -
2 files changed, 1 insertion(+), 56 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index 4beec09006d..bfa19b2496a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -1052,67 +1052,13 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
Assert.assertTrue(e.getCause().getCause() instanceof
TransactionCoordinatorClientException
.InvalidTxnStatusException);
}
- Message<String> message = null;
try {
- message = consumer.receive();
+ Message<String> message = consumer.receive();
consumer.acknowledgeAsync(message.getMessageId(),
transaction).get();
Assert.fail();
} catch (Exception e) {
Assert.assertTrue(e.getCause() instanceof
TransactionCoordinatorClientException
.InvalidTxnStatusException);
}
- Message<String> message1 = consumer.receive(5, TimeUnit.SECONDS);
- Assert.assertEquals(message.getMessageId(), message1.getMessageId());
- }
-
- @Test
- public void testTxnTimeRedeliverForShareSub() throws Exception{
- String topic = NAMESPACE1 + "/testTxnTimeOutInClient";
- @Cleanup
- Producer<String> producer =
pulsarClient.newProducer(Schema.STRING).producerName("testTxnTimeOut_producer")
- .topic(topic).sendTimeout(0,
TimeUnit.SECONDS).enableBatching(false).create();
- @Cleanup
- Consumer<String> consumer1 = pulsarClient.newConsumer(Schema.STRING)
- .consumerName("testTxnTimeOut_consumer1")
- .topic(topic)
- .subscriptionName("testTxnTimeOut_sub1")
- .subscriptionType(SubscriptionType.Shared)
- .subscribe();
-
- @Cleanup
- Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING)
- .consumerName("testTxnTimeOut_consumer2")
- .topic(topic)
- .subscriptionName("testTxnTimeOut_sub2")
- .subscriptionType(SubscriptionType.Shared)
- .subscribe();
-
- Transaction transaction =
pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.SECONDS)
- .build().get();
- for (int i = 0; i < 5; i++) {
- producer.newMessage().send();
- }
-
- Awaitility.await().untilAsserted(() -> {
- Assert.assertEquals(((TransactionImpl)transaction).getState(),
TransactionImpl.State.TIMEOUT);
- });
-
-
- Message<String> message = null;
- try {
- message = consumer1.receive();
- consumer1.acknowledgeAsync(message.getMessageId(),
transaction).get();
- Assert.fail();
- } catch (Exception e) {
- Assert.assertTrue(e.getCause() instanceof
TransactionCoordinatorClientException
- .InvalidTxnStatusException);
- }
-
- Message<String> message1 = consumer1.receive(5, TimeUnit.SECONDS);
- Message<String> message2 = consumer2.receive(5, TimeUnit.SECONDS);
-
Assert.assertTrue(message1.getMessageId().equals(message.getMessageId())
- && !message2.getMessageId().equals(message.getMessageId())
- || message2.getMessageId().equals(message.getMessageId())
- && !message1.getMessageId().equals(message.getMessageId()));
}
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index a87f8c62f18..168b903b1cf 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -571,7 +571,6 @@ public abstract class ConsumerBase<T> extends HandlerState
implements Consumer<T
txnImpl = (TransactionImpl) txn;
CompletableFuture<Void> completableFuture = new
CompletableFuture<>();
if (!txnImpl.checkIfOpen(completableFuture)) {
-
redeliverUnacknowledgedMessages(Collections.singleton(messageId));
return completableFuture;
}
}