This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 3cea10a5533 [fix][txn] Allow producer enable send timeout in 
transaction (#16519)
3cea10a5533 is described below

commit 3cea10a5533df5f346558b28ebc1cfa4628e76c9
Author: congbo <[email protected]>
AuthorDate: Mon Jul 11 18:20:33 2022 +0800

    [fix][txn] Allow producer enable send timeout in transaction (#16519)
    
    (cherry picked from commit bbf2a47867ff54327c1f2940e72f08a44a5dc5f7)
---
 .../client/impl/TransactionEndToEndTest.java       | 42 ++++++++++++++++++++--
 .../apache/pulsar/client/impl/ProducerBase.java    |  5 ---
 2 files changed, 40 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index b4f43f8e838..d3e8674925c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -19,6 +19,10 @@
 package org.apache.pulsar.client.impl;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyObject;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
@@ -33,10 +37,10 @@ import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
-
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.util.concurrent.EventExecutor;
 import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
-
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -1061,4 +1065,38 @@ public class TransactionEndToEndTest extends 
TransactionTestBase {
                     .InvalidTxnStatusException);
         }
     }
+
+    @Test
+    public void testSendTxnMessageTimeout() throws Exception {
+        String topic = NAMESPACE1 + "/testSendTxnMessageTimeout";
+        @Cleanup
+        ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) 
pulsarClient.newProducer()
+                .topic(topic)
+                .sendTimeout(1, TimeUnit.SECONDS)
+                .create();
+
+        Transaction transaction = 
pulsarClient.newTransaction().withTransactionTimeout(5, TimeUnit.SECONDS)
+                .build().get();
+
+        // mock cnx, send message can't receive response
+        ClientCnx cnx = mock(ClientCnx.class);
+        ChannelHandlerContext channelHandlerContext = 
mock(ChannelHandlerContext.class);
+        doReturn(channelHandlerContext).when(cnx).ctx();
+        EventExecutor eventExecutor = mock(EventExecutor.class);
+        doReturn(eventExecutor).when(channelHandlerContext).executor();
+        CompletableFuture<ProducerResponse> completableFuture = new 
CompletableFuture<>();
+        completableFuture.complete(new ProducerResponse("test", 1,
+                "1".getBytes(), Optional.of(30L)));
+        doReturn(completableFuture).when(cnx).sendRequestWithId(anyObject(), 
anyLong());
+        producer.getConnectionHandler().setClientCnx(cnx);
+
+
+        try {
+            // send message with txn use mock cnx, will not receive send 
response
+            producer.newMessage(transaction).value("Hello 
Pulsar!".getBytes()).send();
+            fail();
+        } catch (PulsarClientException ex) {
+            assertTrue(ex instanceof PulsarClientException.TimeoutException);
+        }
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
index 180319034da..053fb529596 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
@@ -92,11 +92,6 @@ public abstract class ProducerBase<T> extends HandlerState 
implements Producer<T
     public TypedMessageBuilder<T> newMessage(Transaction txn) {
         checkArgument(txn instanceof TransactionImpl);
 
-        // check the producer has proper settings to send transactional 
messages
-        if (conf.getSendTimeoutMs() > 0) {
-            throw new IllegalArgumentException("Only producers disabled 
sendTimeout are allowed to"
-                + " produce transactional messages");
-        }
 
         return new TypedMessageBuilderImpl<>(this, schema, (TransactionImpl) 
txn);
     }

Reply via email to