codelipenghui commented on a change in pull request #12521:
URL: https://github.com/apache/pulsar/pull/12521#discussion_r740033610
##########
File path:
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TransactionEndToEndTest.java
##########
@@ -960,4 +960,36 @@ public void oneTransactionOneTopicWithMultiSubTest()
throws Exception {
}
assertTrue(flag);
}
+
+ @Test
+ public void testTxnTimeOutInClient() throws Exception{
+ String topic = NAMESPACE1 + "/testTxnTimeOutInClient";
+ Producer producer =
pulsarClient.newProducer(Schema.STRING).producerName("testTxnTimeOut_producer")
+ .topic(topic).sendTimeout(0,
TimeUnit.SECONDS).enableBatching(false).create();
+ Consumer consumer =
pulsarClient.newConsumer(Schema.STRING).consumerName("testTxnTimeOut_consumer")
+
.topic(topic).subscriptionName("testTxnTimeOut_sub").subscribe();
+
+ Transaction transaction =
pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.SECONDS)
+ .build().get();
+ producer.newMessage().send();
+ Awaitility.await().untilAsserted(() -> {
+ Assert.assertEquals(((TransactionImpl)transaction).getState(),
TransactionImpl.State.ABORTED);
+ });
+
+ try {
+ producer.newMessage(transaction).send();
+ Assert.fail();
+ } catch (Exception e) {
+ Assert.assertTrue(e.getCause().getCause() instanceof
TransactionCoordinatorClientException
Review comment:
This will confuse users because they did not abort the transaction, we
should using a TransactionTimeoutException.
##########
File path:
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
##########
@@ -490,6 +491,15 @@ public void reconsumeLaterCumulative(Message<?> message,
long delayTime, TimeUni
if (null != txn) {
checkArgument(txn instanceof TransactionImpl);
txnImpl = (TransactionImpl) txn;
+ if (txnImpl.getState() != TransactionImpl.State.OPEN) {
+ CompletableFuture<Void> completableFuture = new
CompletableFuture<>();
+ completableFuture
+ .completeExceptionally(new
TransactionCoordinatorClientException
+ .InvalidTxnStatusException("["+
txn.getTxnID().toString() +"] with unexpected state : "
Review comment:
It's better to create the template error message for
InvalidTxnStatusException, looks like using a String formater, so that you can
reuse the formater.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]