This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2c3cffc8a4de7c91ae10655631955a58d805d3dd
Author: fengyubiao <[email protected]>
AuthorDate: Sun Apr 27 17:56:31 2025 +0800

    [fix][client] Fix incorrect producer.getPendingQueueSize due to incomplete 
queue implementation (#24184)
    
    Co-authored-by: Lari Hotari <[email protected]>
    Co-authored-by: Lari Hotari <[email protected]>
    (cherry picked from commit a4a34091cbf58bfa080b42c101bd5094d82c41f3)
---
 .../java/org/apache/pulsar/schema/SchemaTest.java  | 33 ++++++++++++++++++++--
 .../apache/pulsar/client/impl/ProducerImpl.java    | 33 ++++++++++++++++++++--
 .../pulsar/client/impl/OpSendMsgQueueTest.java     | 18 ++++++++++++
 3 files changed, 80 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index c38580ec955..237f629e67a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -47,6 +47,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import lombok.Cleanup;
@@ -65,6 +66,7 @@ import 
org.apache.pulsar.broker.service.schema.SchemaStorageFormat.SchemaLocator
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
@@ -76,6 +78,7 @@ import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.ConsumerImpl;
 import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
+import org.apache.pulsar.client.impl.ProducerImpl;
 import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
 import org.apache.pulsar.client.impl.schema.ProtobufSchema;
 import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
@@ -94,6 +97,7 @@ import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.MetadataCache;
 import org.apache.pulsar.metadata.api.MetadataSerde;
 import org.apache.pulsar.metadata.api.Stat;
+import org.awaitility.Awaitility;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -1452,9 +1456,9 @@ public class SchemaTest extends 
MockedPulsarServiceBaseTest {
     }
 
     /**
-     * This test validates that consumer/producers should recover on topic 
whose 
+     * This test validates that consumer/producers should recover on topic 
whose
      * schema ledgers are not able to open due to non-recoverable error.
-     * 
+     *
      * @throws Exception
      */
     @Test
@@ -1521,4 +1525,29 @@ public class SchemaTest extends 
MockedPulsarServiceBaseTest {
         consumer.close();
         producer.close();
     }
+
+    @Test
+    public void testPendingQueueSizeIfIncompatible() throws Exception {
+        final String namespace = BrokerTestUtil.newUniqueName(PUBLIC_TENANT + 
"/ns");
+        admin.namespaces().createNamespace(namespace, 
Sets.newHashSet(CLUSTER_NAME));
+        admin.namespaces().setSchemaCompatibilityStrategy(namespace, 
SchemaCompatibilityStrategy.ALWAYS_INCOMPATIBLE);
+        final String topic = BrokerTestUtil.newUniqueName(namespace + "/tp");
+        admin.topics().createNonPartitionedTopic(topic);
+
+        ProducerImpl producer = (ProducerImpl) 
pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES())
+                
.maxPendingMessages(50).enableBatching(false).topic(topic).create();
+        producer.newMessage(Schema.STRING).value("msg").sendAsync();
+        AtomicReference<CompletableFuture<MessageId>> latestSend = new 
AtomicReference<>();
+        for (int i = 0; i < 100; i++) {
+            
latestSend.set(producer.newMessage(Schema.BOOL).value(false).sendAsync());
+        }
+        Awaitility.await().untilAsserted(() -> {
+            assertTrue(latestSend.get().isDone());
+            assertEquals(producer.getPendingQueueSize(), 0);
+        });
+
+        // cleanup.
+        producer.close();
+        admin.topics().delete(topic, false);
+    }
 }
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index e311048a4a9..3119e5d535d 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1717,7 +1717,8 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
      * This queue is not thread safe.
      */
     protected static class OpSendMsgQueue implements Iterable<OpSendMsg> {
-        private final Queue<OpSendMsg> delegate = new ArrayDeque<>();
+        @VisibleForTesting
+        final Queue<OpSendMsg> delegate = new ArrayDeque<>();
         private int forEachDepth = 0;
         private List<OpSendMsg> postponedOpSendMgs;
         private final AtomicInteger messagesCount = new AtomicInteger(0);
@@ -1774,7 +1775,35 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
 
         @Override
         public Iterator<OpSendMsg> iterator() {
-            return delegate.iterator();
+            Iterator<OpSendMsg> delegateIterator = delegate.iterator();
+            return new Iterator<OpSendMsg>() {
+                OpSendMsg currentOp;
+
+                @Override
+                public boolean hasNext() {
+                    return delegateIterator.hasNext();
+                }
+
+                @Override
+                public OpSendMsg next() {
+                    currentOp = delegateIterator.next();
+                    return currentOp;
+                }
+
+                @Override
+                public void remove() {
+                    delegateIterator.remove();
+                    if (currentOp != null) {
+                       messagesCount.addAndGet(-currentOp.numMessagesInBatch);
+                       currentOp = null;
+                     }
+                }
+
+                @Override
+                public void forEachRemaining(Consumer<? super OpSendMsg> 
action) {
+                    delegateIterator.forEachRemaining(action);
+                }
+            };
         }
     }
 
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/OpSendMsgQueueTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/OpSendMsgQueueTest.java
index efcc06bede3..28787568dd1 100644
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/OpSendMsgQueueTest.java
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/OpSendMsgQueueTest.java
@@ -23,6 +23,7 @@ import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import com.google.common.collect.Lists;
 import java.util.Arrays;
+import java.util.Iterator;
 import org.apache.pulsar.client.impl.metrics.LatencyHistogram;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -83,4 +84,21 @@ public class OpSendMsgQueueTest {
         // then
         assertEquals(Lists.newArrayList(queue), Arrays.asList(opSendMsg, 
opSendMsg2, opSendMsg3, opSendMsg4));
     }
+
+    @Test
+    public void testIteratorRemove() {
+        ProducerImpl.OpSendMsgQueue queue = new ProducerImpl.OpSendMsgQueue();
+        for (int i = 0; i < 10; i++) {
+            queue.add(createDummyOpSendMsg());
+        }
+
+        Iterator<ProducerImpl.OpSendMsg> iterator = queue.iterator();
+        while (iterator.hasNext()) {
+            iterator.next();
+            iterator.remove();
+        }
+        // Verify: the result of "messagesCount()" is 0 after removed all 
items.
+        assertEquals(queue.delegate.size(), 0);
+        assertEquals(queue.messagesCount(), 0);
+    }
 }

Reply via email to