This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 58c82a71beb Revert #15524: do not redeliver message when txn is not 
opened (#15593)
58c82a71beb is described below

commit 58c82a71beb7506e422def391af532945be2b7a7
Author: Xiangying Meng <[email protected]>
AuthorDate: Sat May 14 17:47:25 2022 +0800

    Revert #15524: do not redeliver message when txn is not opened (#15593)
---
 .../client/impl/TransactionEndToEndTest.java       | 56 +---------------------
 .../apache/pulsar/client/impl/ConsumerBase.java    |  1 -
 2 files changed, 1 insertion(+), 56 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index 4beec09006d..bfa19b2496a 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -1052,67 +1052,13 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
             Assert.assertTrue(e.getCause().getCause() instanceof 
TransactionCoordinatorClientException
                     .InvalidTxnStatusException);
         }
-        Message<String> message = null;
         try {
-            message = consumer.receive();
+            Message<String> message = consumer.receive();
             consumer.acknowledgeAsync(message.getMessageId(), 
transaction).get();
             Assert.fail();
         } catch (Exception e) {
             Assert.assertTrue(e.getCause() instanceof 
TransactionCoordinatorClientException
                     .InvalidTxnStatusException);
         }
-        Message<String> message1 = consumer.receive(5, TimeUnit.SECONDS);
-        Assert.assertEquals(message.getMessageId(), message1.getMessageId());
-    }
-
-    @Test
-    public void testTxnTimeRedeliverForShareSub() throws Exception{
-        String topic = NAMESPACE1 + "/testTxnTimeOutInClient";
-        @Cleanup
-        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).producerName("testTxnTimeOut_producer")
-                .topic(topic).sendTimeout(0, 
TimeUnit.SECONDS).enableBatching(false).create();
-        @Cleanup
-        Consumer<String> consumer1 = pulsarClient.newConsumer(Schema.STRING)
-                .consumerName("testTxnTimeOut_consumer1")
-                .topic(topic)
-                .subscriptionName("testTxnTimeOut_sub1")
-                .subscriptionType(SubscriptionType.Shared)
-                .subscribe();
-
-        @Cleanup
-        Consumer<String> consumer2 = pulsarClient.newConsumer(Schema.STRING)
-                .consumerName("testTxnTimeOut_consumer2")
-                .topic(topic)
-                .subscriptionName("testTxnTimeOut_sub2")
-                .subscriptionType(SubscriptionType.Shared)
-                .subscribe();
-
-        Transaction transaction = 
pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.SECONDS)
-                .build().get();
-        for (int i = 0; i < 5; i++) {
-            producer.newMessage().send();
-        }
-
-        Awaitility.await().untilAsserted(() -> {
-            Assert.assertEquals(((TransactionImpl)transaction).getState(), 
TransactionImpl.State.TIMEOUT);
-        });
-
-
-        Message<String> message = null;
-        try {
-            message = consumer1.receive();
-            consumer1.acknowledgeAsync(message.getMessageId(), 
transaction).get();
-            Assert.fail();
-        } catch (Exception e) {
-            Assert.assertTrue(e.getCause() instanceof 
TransactionCoordinatorClientException
-                    .InvalidTxnStatusException);
-        }
-
-        Message<String> message1 = consumer1.receive(5, TimeUnit.SECONDS);
-        Message<String> message2 = consumer2.receive(5, TimeUnit.SECONDS);
-        
Assert.assertTrue(message1.getMessageId().equals(message.getMessageId())
-                && !message2.getMessageId().equals(message.getMessageId())
-                || message2.getMessageId().equals(message.getMessageId())
-                && !message1.getMessageId().equals(message.getMessageId()));
     }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index a87f8c62f18..168b903b1cf 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -571,7 +571,6 @@ public abstract class ConsumerBase<T> extends HandlerState 
implements Consumer<T
             txnImpl = (TransactionImpl) txn;
             CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
            if (!txnImpl.checkIfOpen(completableFuture)) {
-               
redeliverUnacknowledgedMessages(Collections.singleton(messageId));
                return completableFuture;
            }
         }

Reply via email to