This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new 3cea10a5533 [fix][txn] Allow producer enable send timeout in
transaction (#16519)
3cea10a5533 is described below
commit 3cea10a5533df5f346558b28ebc1cfa4628e76c9
Author: congbo <[email protected]>
AuthorDate: Mon Jul 11 18:20:33 2022 +0800
[fix][txn] Allow producer enable send timeout in transaction (#16519)
(cherry picked from commit bbf2a47867ff54327c1f2940e72f08a44a5dc5f7)
---
.../client/impl/TransactionEndToEndTest.java | 42 ++++++++++++++++++++--
.../apache/pulsar/client/impl/ProducerBase.java | 5 ---
2 files changed, 40 insertions(+), 7 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
index b4f43f8e838..d3e8674925c 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
@@ -19,6 +19,10 @@
package org.apache.pulsar.client.impl;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyObject;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
@@ -33,10 +37,10 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
-
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.util.concurrent.EventExecutor;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
-
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -1061,4 +1065,38 @@ public class TransactionEndToEndTest extends
TransactionTestBase {
.InvalidTxnStatusException);
}
}
+
+ @Test
+ public void testSendTxnMessageTimeout() throws Exception {
+ String topic = NAMESPACE1 + "/testSendTxnMessageTimeout";
+ @Cleanup
+ ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>)
pulsarClient.newProducer()
+ .topic(topic)
+ .sendTimeout(1, TimeUnit.SECONDS)
+ .create();
+
+ Transaction transaction =
pulsarClient.newTransaction().withTransactionTimeout(5, TimeUnit.SECONDS)
+ .build().get();
+
+ // mock cnx, send message can't receive response
+ ClientCnx cnx = mock(ClientCnx.class);
+ ChannelHandlerContext channelHandlerContext =
mock(ChannelHandlerContext.class);
+ doReturn(channelHandlerContext).when(cnx).ctx();
+ EventExecutor eventExecutor = mock(EventExecutor.class);
+ doReturn(eventExecutor).when(channelHandlerContext).executor();
+ CompletableFuture<ProducerResponse> completableFuture = new
CompletableFuture<>();
+ completableFuture.complete(new ProducerResponse("test", 1,
+ "1".getBytes(), Optional.of(30L)));
+ doReturn(completableFuture).when(cnx).sendRequestWithId(anyObject(),
anyLong());
+ producer.getConnectionHandler().setClientCnx(cnx);
+
+
+ try {
+ // send message with txn use mock cnx, will not receive send
response
+ producer.newMessage(transaction).value("Hello
Pulsar!".getBytes()).send();
+ fail();
+ } catch (PulsarClientException ex) {
+ assertTrue(ex instanceof PulsarClientException.TimeoutException);
+ }
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
index 180319034da..053fb529596 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerBase.java
@@ -92,11 +92,6 @@ public abstract class ProducerBase<T> extends HandlerState
implements Producer<T
public TypedMessageBuilder<T> newMessage(Transaction txn) {
checkArgument(txn instanceof TransactionImpl);
- // check the producer has proper settings to send transactional
messages
- if (conf.getSendTimeoutMs() > 0) {
- throw new IllegalArgumentException("Only producers disabled
sendTimeout are allowed to"
- + " produce transactional messages");
- }
return new TypedMessageBuilderImpl<>(this, schema, (TransactionImpl)
txn);
}