This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit bf92418f8247e536098f2a86e6f51318c50eea99 Author: Malla Sandeep <[email protected]> AuthorDate: Fri Feb 6 00:51:54 2026 +0530 [fix][client] Fix producer synchronous retry handling in failPendingMessages method (#25207) (cherry picked from commit 611efe4a77a4cc9ec1875d4ee5fd29916d3f75c7) --- .../pulsar/client/impl/ProducerSyncRetryTest.java | 99 ++++++++++++++++++++++ .../apache/pulsar/client/impl/ProducerImpl.java | 22 +++-- .../pulsar/client/impl/ProducerImplTest.java | 75 ++++++++++++++++ 3 files changed, 188 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSyncRetryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSyncRetryTest.java new file mode 100644 index 00000000000..359db97fc78 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSyncRetryTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import lombok.Cleanup; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.ProducerConsumerBase; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.api.proto.MessageMetadata; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +@Test(groups = "broker-impl") +public class ProducerSyncRetryTest extends ProducerConsumerBase { + + @Override + @BeforeMethod + public void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @Override + @AfterMethod(alwaysRun = true) + public void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test(timeOut = 30000) + public void testProducerSyncRetryAfterTimeout() throws Exception { + final String topic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp"); + @Cleanup + ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer() + .topic(topic) + .enableBatching(false) + .sendTimeout(1, TimeUnit.MILLISECONDS) // force timeout + .create(); + + // To make sure first message is timed out + this.stopBroker(); + + // First message will get timed out, then be retried with same payload + ByteBuffer payload = ByteBuffer.wrap(new byte[0]); + MessageMetadata messageMetadata = new MessageMetadata(); + messageMetadata.setUncompressedSize(1); + MessageImpl<byte[]> message = MessageImpl.create(messageMetadata, payload, Schema.BYTES, topic); + + MessageMetadata retryMessageMetadata = new MessageMetadata(); + retryMessageMetadata.setUncompressedSize(1); + MessageImpl<byte[]> retryMessage = MessageImpl.create(retryMessageMetadata, payload, Schema.BYTES, topic); + + // First send is expected to fail + CompletableFuture<MessageId> firstSend = producer.sendAsync(message); + producer.triggerSendTimer(); + + // Waits until firstSend returns timeout exception + CompletableFuture<MessageId> retrySend = + firstSend.handle((msgId, ex) -> { + assertNotNull(ex, "First send must timeout"); + assertTrue(ex instanceof PulsarClientException.TimeoutException); + try { + // Retry should succeed + this.startBroker(); + } catch (Exception e) { + throw new RuntimeException(e); + } + producer.conf.setSendTimeoutMs(10000); + return producer.sendAsync(retryMessage); + }).thenCompose(f -> f); + + // Wait until retry completes successfully + MessageId retryMessageId = retrySend.join(); + assertNotNull(retryMessageId); + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index c33ae1940a5..c001d8f8ba5 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -329,7 +329,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne return connectionHandler; } - private boolean isBatchMessagingEnabled() { + public boolean isBatchMessagingEnabled() { return conf.isBatchingEnabled(); } @@ -1721,11 +1721,12 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne messagesCount.set(0); } - public void remove() { + public OpSendMsg remove() { OpSendMsg op = delegate.remove(); if (op != null) { messagesCount.addAndGet(-op.numMessagesInBatch); } + return op; } public OpSendMsg peek() { @@ -2195,14 +2196,20 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne } /** - * This fails and clears the pending messages with the given exception. This method should be called from within the - * ProducerImpl object mutex. + * This fails the pending messages at the start of the call, without dropping newly enqueued + * retry messages. This method should be called from within the ProducerImpl object mutex. */ - private synchronized void failPendingMessages(ClientCnx cnx, PulsarClientException ex) { + @VisibleForTesting + synchronized void failPendingMessages(ClientCnx cnx, PulsarClientException ex) { if (cnx == null) { final AtomicInteger releaseCount = new AtomicInteger(); final boolean batchMessagingEnabled = isBatchMessagingEnabled(); - pendingMessages.forEach(op -> { + // Track message count to fail so that newly added messages by synchronous retries + // triggered by op.sendComplete(ex); don't get removed + int pendingMessagesToFailCount = pendingMessages.size(); + + for (int i = 0; i < pendingMessagesToFailCount; i++) { + OpSendMsg op = pendingMessages.remove(); releaseCount.addAndGet(batchMessagingEnabled ? op.numMessagesInBatch : 1); try { // Need to protect ourselves from any exception being thrown in the future handler from the @@ -2221,9 +2228,8 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne client.getMemoryLimitController().releaseMemory(op.uncompressedSize); ReferenceCountUtil.safeRelease(op.cmd); op.recycle(); - }); + } - pendingMessages.clear(); semaphoreRelease(releaseCount.get()); if (batchMessagingEnabled) { failPendingBatchMessages(ex); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java index 7c6bcf2cf06..a91340517d7 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java @@ -26,8 +26,11 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import java.nio.ByteBuffer; +import org.apache.commons.lang3.reflect.FieldUtils; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.common.api.proto.MessageMetadata; +import org.apache.pulsar.common.protocol.ByteBufPair; import org.mockito.Mockito; import org.testng.annotations.Test; @@ -65,4 +68,76 @@ public class ProducerImplTest { assertTrue(producer.populateMessageSchema(msg, null)); verify(msg).setSchemaState(MessageImpl.SchemaState.Ready); } + + @Test + public void testFailPendingMessagesSyncRetry() + throws Exception { + ProducerImpl<byte[]> producer = + Mockito.mock(ProducerImpl.class, Mockito.CALLS_REAL_METHODS); + // Disable batching + Mockito.doReturn(false) + .when(producer) + .isBatchMessagingEnabled(); + + // Stub semaphore release (not under test) + Mockito.doNothing() + .when(producer) + .semaphoreRelease(Mockito.anyInt()); + + // Stub client cleanup path (not under test) + PulsarClientImpl client = Mockito.mock(PulsarClientImpl.class); + Mockito.when(client.getMemoryLimitController()) + .thenReturn(Mockito.mock(MemoryLimitController.class)); + FieldUtils.writeField(producer, "client", client, true); + + // Real pending queue + ProducerImpl.OpSendMsgQueue pendingQueue = new ProducerImpl.OpSendMsgQueue(); + FieldUtils.writeField(producer, "pendingMessages", pendingQueue, true); + + // OpSendMsg that retries reentrantly + MessageImpl<?> msg = Mockito.mock(MessageImpl.class); + Mockito.when(msg.getUncompressedSize()).thenReturn(10); + ProducerImpl.OpSendMsg op = ProducerImpl.OpSendMsg.create( + msg, + Mockito.mock(ByteBufPair.class), + 1L, + Mockito.mock(SendCallback.class) + ); + op.totalChunks = 1; + op.chunkId = 0; + op.numMessagesInBatch = 1; + + MessageImpl<?> retryMsg = Mockito.mock(MessageImpl.class); + Mockito.when(retryMsg.getUncompressedSize()).thenReturn(10); + + // Override sendComplete to Reentrant retry via spy + ProducerImpl.OpSendMsg firstSpy = Mockito.spy(op); + Mockito.doAnswer(invocation -> { + // Reentrant retry during callback + ProducerImpl.OpSendMsg retryOp = ProducerImpl.OpSendMsg.create( + retryMsg, + Mockito.mock(ByteBufPair.class), + 2L, + Mockito.mock(SendCallback.class) + ); + retryOp.totalChunks = 1; + retryOp.chunkId = 0; + retryOp.numMessagesInBatch = 1; + pendingQueue.add(retryOp); + return null; + }).when(firstSpy).sendComplete(Mockito.any()); + Mockito.doNothing() + .when(firstSpy) + .recycle(); + + // Seed initial pending message + pendingQueue.add(firstSpy); + + // Invoke failPendingMessages(null, ex) + producer.failPendingMessages(null, new PulsarClientException.TimeoutException("timeout")); + assertEquals(producer.getPendingQueueSize(), 1, + "Retry Op should exist in the pending Queue"); + assertEquals(pendingQueue.peek().sequenceId, 2L, + "Retry Op SequenceId should match with the one in pendingQueue"); + } }
