This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit bf92418f8247e536098f2a86e6f51318c50eea99
Author: Malla Sandeep <[email protected]>
AuthorDate: Fri Feb 6 00:51:54 2026 +0530

    [fix][client] Fix producer synchronous retry handling in 
failPendingMessages method (#25207)
    
    (cherry picked from commit 611efe4a77a4cc9ec1875d4ee5fd29916d3f75c7)
---
 .../pulsar/client/impl/ProducerSyncRetryTest.java  | 99 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ProducerImpl.java    | 22 +++--
 .../pulsar/client/impl/ProducerImplTest.java       | 75 ++++++++++++++++
 3 files changed, 188 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSyncRetryTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSyncRetryTest.java
new file mode 100644
index 00000000000..359db97fc78
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSyncRetryTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker-impl")
+public class ProducerSyncRetryTest extends ProducerConsumerBase {
+
+    @Override
+    @BeforeMethod
+    public void setup() throws Exception {
+        super.internalSetup();
+        super.producerBaseSetup();
+    }
+
+    @Override
+    @AfterMethod(alwaysRun = true)
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test(timeOut = 30000)
+    public void testProducerSyncRetryAfterTimeout() throws Exception {
+        final String topic = 
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/tp");
+        @Cleanup
+        ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) 
pulsarClient.newProducer()
+                .topic(topic)
+                .enableBatching(false)
+                .sendTimeout(1, TimeUnit.MILLISECONDS) // force timeout
+                .create();
+
+        // To make sure first message is timed out
+        this.stopBroker();
+
+        // First message will get timed out, then be retried with same payload
+        ByteBuffer payload = ByteBuffer.wrap(new byte[0]);
+        MessageMetadata messageMetadata = new MessageMetadata();
+        messageMetadata.setUncompressedSize(1);
+        MessageImpl<byte[]> message = MessageImpl.create(messageMetadata, 
payload, Schema.BYTES, topic);
+
+        MessageMetadata retryMessageMetadata = new MessageMetadata();
+        retryMessageMetadata.setUncompressedSize(1);
+        MessageImpl<byte[]> retryMessage = 
MessageImpl.create(retryMessageMetadata, payload, Schema.BYTES, topic);
+
+        // First send is expected to fail
+        CompletableFuture<MessageId> firstSend = producer.sendAsync(message);
+        producer.triggerSendTimer();
+
+        // Waits until firstSend returns timeout exception
+        CompletableFuture<MessageId> retrySend =
+                firstSend.handle((msgId, ex) -> {
+                    assertNotNull(ex, "First send must timeout");
+                    assertTrue(ex instanceof 
PulsarClientException.TimeoutException);
+                    try {
+                        // Retry should succeed
+                        this.startBroker();
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                    producer.conf.setSendTimeoutMs(10000);
+                    return producer.sendAsync(retryMessage);
+                }).thenCompose(f -> f);
+
+        // Wait until retry completes successfully
+        MessageId retryMessageId = retrySend.join();
+        assertNotNull(retryMessageId);
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index c33ae1940a5..c001d8f8ba5 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -329,7 +329,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         return connectionHandler;
     }
 
-    private boolean isBatchMessagingEnabled() {
+    public boolean isBatchMessagingEnabled() {
         return conf.isBatchingEnabled();
     }
 
@@ -1721,11 +1721,12 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
             messagesCount.set(0);
         }
 
-        public void remove() {
+        public OpSendMsg remove() {
             OpSendMsg op = delegate.remove();
             if (op != null) {
                 messagesCount.addAndGet(-op.numMessagesInBatch);
             }
+            return op;
         }
 
         public OpSendMsg peek() {
@@ -2195,14 +2196,20 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
     }
 
     /**
-     * This fails and clears the pending messages with the given exception. 
This method should be called from within the
-     * ProducerImpl object mutex.
+     * This fails the pending messages at the start of the call, without 
dropping newly enqueued
+     * retry messages. This method should be called from within the 
ProducerImpl object mutex.
      */
-    private synchronized void failPendingMessages(ClientCnx cnx, 
PulsarClientException ex) {
+    @VisibleForTesting
+    synchronized void failPendingMessages(ClientCnx cnx, PulsarClientException 
ex) {
         if (cnx == null) {
             final AtomicInteger releaseCount = new AtomicInteger();
             final boolean batchMessagingEnabled = isBatchMessagingEnabled();
-            pendingMessages.forEach(op -> {
+            // Track message count to fail so that newly added messages by 
synchronous retries
+            // triggered by op.sendComplete(ex); don't get removed
+            int pendingMessagesToFailCount = pendingMessages.size();
+
+            for (int i = 0; i < pendingMessagesToFailCount; i++) {
+                OpSendMsg op = pendingMessages.remove();
                 releaseCount.addAndGet(batchMessagingEnabled ? 
op.numMessagesInBatch : 1);
                 try {
                     // Need to protect ourselves from any exception being 
thrown in the future handler from the
@@ -2221,9 +2228,8 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                 
client.getMemoryLimitController().releaseMemory(op.uncompressedSize);
                 ReferenceCountUtil.safeRelease(op.cmd);
                 op.recycle();
-            });
+            }
 
-            pendingMessages.clear();
             semaphoreRelease(releaseCount.get());
             if (batchMessagingEnabled) {
                 failPendingBatchMessages(ex);
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java
index 7c6bcf2cf06..a91340517d7 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerImplTest.java
@@ -26,8 +26,11 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import java.nio.ByteBuffer;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.protocol.ByteBufPair;
 import org.mockito.Mockito;
 import org.testng.annotations.Test;
 
@@ -65,4 +68,76 @@ public class ProducerImplTest {
         assertTrue(producer.populateMessageSchema(msg, null));
         verify(msg).setSchemaState(MessageImpl.SchemaState.Ready);
     }
+
+    @Test
+    public void testFailPendingMessagesSyncRetry()
+            throws Exception {
+        ProducerImpl<byte[]> producer =
+                Mockito.mock(ProducerImpl.class, Mockito.CALLS_REAL_METHODS);
+        // Disable batching
+        Mockito.doReturn(false)
+                .when(producer)
+                .isBatchMessagingEnabled();
+
+        // Stub semaphore release (not under test)
+        Mockito.doNothing()
+                .when(producer)
+                .semaphoreRelease(Mockito.anyInt());
+
+        // Stub client cleanup path (not under test)
+        PulsarClientImpl client = Mockito.mock(PulsarClientImpl.class);
+        Mockito.when(client.getMemoryLimitController())
+                .thenReturn(Mockito.mock(MemoryLimitController.class));
+        FieldUtils.writeField(producer, "client", client, true);
+
+        // Real pending queue
+        ProducerImpl.OpSendMsgQueue pendingQueue = new 
ProducerImpl.OpSendMsgQueue();
+        FieldUtils.writeField(producer, "pendingMessages", pendingQueue, true);
+
+        // OpSendMsg that retries reentrantly
+        MessageImpl<?> msg = Mockito.mock(MessageImpl.class);
+        Mockito.when(msg.getUncompressedSize()).thenReturn(10);
+        ProducerImpl.OpSendMsg op = ProducerImpl.OpSendMsg.create(
+                msg,
+                Mockito.mock(ByteBufPair.class),
+                1L,
+                Mockito.mock(SendCallback.class)
+        );
+        op.totalChunks = 1;
+        op.chunkId = 0;
+        op.numMessagesInBatch = 1;
+
+        MessageImpl<?> retryMsg = Mockito.mock(MessageImpl.class);
+        Mockito.when(retryMsg.getUncompressedSize()).thenReturn(10);
+
+        // Override sendComplete to Reentrant retry via spy
+        ProducerImpl.OpSendMsg firstSpy = Mockito.spy(op);
+        Mockito.doAnswer(invocation -> {
+            // Reentrant retry during callback
+            ProducerImpl.OpSendMsg retryOp = ProducerImpl.OpSendMsg.create(
+                    retryMsg,
+                    Mockito.mock(ByteBufPair.class),
+                    2L,
+                    Mockito.mock(SendCallback.class)
+            );
+            retryOp.totalChunks = 1;
+            retryOp.chunkId = 0;
+            retryOp.numMessagesInBatch = 1;
+            pendingQueue.add(retryOp);
+            return null;
+        }).when(firstSpy).sendComplete(Mockito.any());
+        Mockito.doNothing()
+                .when(firstSpy)
+                .recycle();
+
+        // Seed initial pending message
+        pendingQueue.add(firstSpy);
+
+        // Invoke failPendingMessages(null, ex)
+        producer.failPendingMessages(null, new 
PulsarClientException.TimeoutException("timeout"));
+        assertEquals(producer.getPendingQueueSize(), 1,
+                "Retry Op should exist in the pending Queue");
+        assertEquals(pendingQueue.peek().sequenceId, 2L,
+                "Retry Op SequenceId should match with the one in 
pendingQueue");
+    }
 }

Reply via email to