This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 64f574a706fb88c3fece21c3e30ee691ddce7505
Author: fengyubiao <[email protected]>
AuthorDate: Sun Apr 27 17:56:31 2025 +0800

    [fix][client] Fix incorrect producer.getPendingQueueSize due to incomplete 
queue implementation (#24184)
    
    Co-authored-by: Lari Hotari <[email protected]>
    Co-authored-by: Lari Hotari <[email protected]>
    (cherry picked from commit a4a34091cbf58bfa080b42c101bd5094d82c41f3)
---
 .../java/org/apache/pulsar/schema/SchemaTest.java  | 29 +++++++++++++++++++
 .../apache/pulsar/client/impl/ProducerImpl.java    | 33 ++++++++++++++++++++--
 .../pulsar/client/impl/OpSendMsgQueueTest.java     | 18 ++++++++++++
 3 files changed, 78 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index 21999989603..8908b9e96c3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -46,6 +46,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import lombok.Cleanup;
@@ -62,6 +63,7 @@ import 
org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
@@ -73,6 +75,7 @@ import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
+import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
 import org.apache.pulsar.client.impl.schema.ProtobufSchema;
 import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
@@ -88,6 +91,7 @@ import org.apache.pulsar.common.schema.KeyValueEncodingType;
 import org.apache.pulsar.common.schema.SchemaInfo;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -1445,4 +1449,29 @@ public class SchemaTest extends 
MockedPulsarServiceBaseTest {
         }
     }
 
+
+    @Test
+    public void testPendingQueueSizeIfIncompatible() throws Exception {
+        final String namespace = BrokerTestUtil.newUniqueName(PUBLIC_TENANT + 
"/ns");
+        admin.namespaces().createNamespace(namespace, 
Sets.newHashSet(CLUSTER_NAME));
+        admin.namespaces().setSchemaCompatibilityStrategy(namespace, 
SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE);
+        final String topic = BrokerTestUtil.newUniqueName(namespace + "/tp");
+        admin.topics().createNonPartitionedTopic(topic);
+
+        ProducerImpl producer = (ProducerImpl) 
pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES())
+                
.maxPendingMessages(50).enableBatching(false).topic(topic).create();
+        producer.newMessage(Schema.STRING).value("msg").sendAsync();
+        AtomicReference<CompletableFuture<MessageId>> latestSend = new 
AtomicReference<>();
+        for (int i = 0; i < 100; i++) {
+            
latestSend.set(producer.newMessage(Schema.BOOL).value(false).sendAsync());
+        }
+        Awaitility.await().untilAsserted(() -> {
+            assertTrue(latestSend.get().isDone());
+            assertEquals(producer.getPendingQueueSize(), 0);
+        });
+
+        // cleanup.
+        producer.close();
+        admin.topics().delete(topic, false);
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 3383acfded3..4faaf18dd78 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1645,7 +1645,8 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
      * This queue is not thread safe.
      */
     protected static class OpSendMsgQueue implements Iterable<OpSendMsg> {
-        private final Queue<OpSendMsg> delegate = new ArrayDeque<>();
+        @VisibleForTesting
+        final Queue<OpSendMsg> delegate = new ArrayDeque<>();
         private int forEachDepth = 0;
         private List<OpSendMsg> postponedOpSendMgs;
         private final AtomicInteger messagesCount = new AtomicInteger(0);
@@ -1702,7 +1703,35 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
 
         @Override
         public Iterator<OpSendMsg> iterator() {
-            return delegate.iterator();
+            Iterator<OpSendMsg> delegateIterator = delegate.iterator();
+            return new Iterator<OpSendMsg>() {
+                OpSendMsg currentOp;
+
+                @Override
+                public boolean hasNext() {
+                    return delegateIterator.hasNext();
+                }
+
+                @Override
+                public OpSendMsg next() {
+                    currentOp = delegateIterator.next();
+                    return currentOp;
+                }
+
+                @Override
+                public void remove() {
+                    delegateIterator.remove();
+                    if (currentOp != null) {
+                       messagesCount.addAndGet(-currentOp.numMessagesInBatch);
+                       currentOp = null;
+                     }
+                }
+
+                @Override
+                public void forEachRemaining(Consumer<? super OpSendMsg> 
action) {
+                    delegateIterator.forEachRemaining(action);
+                }
+            };
         }
     }
 
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/OpSendMsgQueueTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/OpSendMsgQueueTest.java
index 2db23782640..6dc25f42960 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/OpSendMsgQueueTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/OpSendMsgQueueTest.java
@@ -23,6 +23,7 @@ import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import com.google.common.collect.Lists;
 import java.util.Arrays;
+import java.util.Iterator;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
@@ -82,4 +83,21 @@ public class OpSendMsgQueueTest {
         // then
         assertEquals(Lists.newArrayList(queue), Arrays.asList(opSendMsg, 
opSendMsg2, opSendMsg3, opSendMsg4));
     }
+
+    @Test
+    public void testIteratorRemove() {
+        ProducerImpl.OpSendMsgQueue queue = new ProducerImpl.OpSendMsgQueue();
+        for (int i = 0; i < 10; i++) {
+            queue.add(createDummyOpSendMsg());
+        }
+
+        Iterator<ProducerImpl.OpSendMsg> iterator = queue.iterator();
+        while (iterator.hasNext()) {
+            iterator.next();
+            iterator.remove();
+        }
+        // Verify: the result of "messagesCount()" is 0 after removed all 
items.
+        assertEquals(queue.delegate.size(), 0);
+        assertEquals(queue.messagesCount(), 0);
+    }
 }

Reply via email to