This is an automated email from the ASF dual-hosted git repository.

xiangying pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new df2c619b14d [fix][client] Fix producer publishing getting stuck after 
message with incompatible schema is discarded (#24282)
df2c619b14d is described below

commit df2c619b14d277d8df82ce9a0ab1ab146cf51b7f
Author: fengyubiao <[email protected]>
AuthorDate: Mon May 12 19:15:25 2025 +0800

    [fix][client] Fix producer publishing getting stuck after message with 
incompatible schema is discarded (#24282)
    
    Fixes #24262
    
    Main Issue: #24262
    
    ### Motivation
    
    The issue is a regression of https://github.com/apache/pulsar/pull/24178. 
The flow of the issue occurring is as follows
    
    - Publish msg 1, which has an incompatible schema
    - Publish msg 2, which has an incompatible schema
    - The first message's schema failed to register because it is incompatible
      - The message was discarded.
      - Issue: it did not trigger a schema registration of the following 
messages
    
    ### Modifications
    
    Fix the issue
---
 .../java/org/apache/pulsar/schema/SchemaTest.java    | 20 +++++++++++++++-----
 .../org/apache/pulsar/client/impl/ProducerImpl.java  | 15 +++++++++++----
 2 files changed, 26 insertions(+), 9 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index 711e8ba5ad7..07c626a549d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -1528,8 +1528,7 @@ public class SchemaTest extends 
MockedPulsarServiceBaseTest {
         producer.close();
     }
 
-    // This test fails consistently, disabling until it is fixed. Issue 
https://github.com/apache/pulsar/issues/24262
-    @Test(enabled = false)
+    @Test
     public void testPendingQueueSizeIfIncompatible() throws Exception {
         final String namespace = BrokerTestUtil.newUniqueName(PUBLIC_TENANT + 
"/ns");
         admin.namespaces().createNamespace(namespace, 
Sets.newHashSet(CLUSTER_NAME));
@@ -1538,17 +1537,28 @@ public class SchemaTest extends 
MockedPulsarServiceBaseTest {
         admin.topics().createNonPartitionedTopic(topic);
 
         ProducerImpl producer = (ProducerImpl) 
pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES())
-                
.maxPendingMessages(50).enableBatching(false).topic(topic).create();
-        producer.newMessage(Schema.STRING).value("msg").sendAsync();
+                
.maxPendingMessages(1000).enableBatching(false).topic(topic).create();
+        producer.newMessage(Schema.STRING).value("msg-1").sendAsync();
         AtomicReference<CompletableFuture<MessageId>> latestSend = new 
AtomicReference<>();
         for (int i = 0; i < 100; i++) {
-            
latestSend.set(producer.newMessage(Schema.BOOL).value(false).sendAsync());
+            final String msg = "msg-with-broken-schema-" + i;
+            
latestSend.set(producer.newMessage(Schema.BOOL).value(false).sendAsync().thenApply(v
 -> {
+                log.info("send complete {}", msg);
+                return null;
+            }).exceptionally(ex -> {
+                log.error("failed to send {}", msg, ex);
+                return null;
+            }));
         }
+        // Verify: msgs with broken schema will be discarded.
         Awaitility.await().untilAsserted(() -> {
             assertTrue(latestSend.get().isDone());
             assertEquals(producer.getPendingQueueSize(), 0);
         });
 
+        // Verify: msgs with compatible schema can be sent successfully.
+        producer.newMessage(Schema.STRING).value("msg-2").sendAsync();
+
         // cleanup.
         producer.close();
         admin.topics().delete(topic, false);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 21b79da9d5a..9960af6046a 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -2464,6 +2464,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
      *     3-1-1. If {@link 
#pauseSendingToPreservePublishOrderOnSchemaRegFailure} is true pause all 
following
      *       publishing to avoid out-of-order issue.
      *     3-1-2. Otherwise, discard the failed message anc continuously 
publishing the following messages.
+     *            Additionally, the following messages may need schema 
registration also.
      *   3-2. The new schema registration failed due to other error, retry 
registering.
      * Note: Since the current method accesses & modifies {@link 
#pendingMessages}, you should acquire a lock on
      *       {@link ProducerImpl} before calling method.
@@ -2482,6 +2483,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         Iterator<OpSendMsg> msgIterator = pendingMessages.iterator();
         MessageImpl loopStartAt = latestMsgAttemptedRegisteredSchema;
         OpSendMsg loopEndDueToSchemaRegisterNeeded = null;
+        boolean pausedSendingToPreservePublishOrderOnSchemaRegFailure = false;
         while (msgIterator.hasNext()) {
             OpSendMsg op = msgIterator.next();
             if (loopStartAt != null) {
@@ -2526,6 +2528,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                                 + " 2) Unload topic on target cluster. Schema 
details: {}",
                                 topic, producerName, 
SchemaUtils.jsonifySchemaInfo(msgSchemaInfo, false));
                         loopEndDueToSchemaRegisterNeeded = op;
+                        pausedSendingToPreservePublishOrderOnSchemaRegFailure 
= true;
                         break;
                     }
                     // Event 3-1-2.
@@ -2581,7 +2584,7 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
         }
         cnx.ctx().flush();
 
-        // "Event 1-1" or "Event 3-1-1" or "Event 3-2".
+        // "Event 1-1" or "Event 3-1-1" or "Event 3-1-2" or "Event 3-2".
         if (loopEndDueToSchemaRegisterNeeded != null) {
             if (compareAndSetState(State.Connecting, State.Ready)) {
                 // "Event 1-1" happens after "Event 3-1-1".
@@ -2589,15 +2592,19 @@ public class ProducerImpl<T> extends ProducerBase<T> 
implements TimerTask, Conne
                 // after users changed the compatibility strategy to make the 
schema is compatible.
                 tryRegisterSchema(cnx, loopEndDueToSchemaRegisterNeeded.msg, 
loopEndDueToSchemaRegisterNeeded.callback,
                     expectedEpoch);
-            } else if (!failedIncompatibleSchema && 
compareAndSetState(State.RegisteringSchema, State.Ready)) {
-                // "Event 2-1" or "Event 3-2".
+            } else if (pausedSendingToPreservePublishOrderOnSchemaRegFailure) {
+                // Nothing to do if the event is "Event 3-1-1", just keep 
stuck.
+                return;
+            } else if (compareAndSetState(State.RegisteringSchema, 
State.Ready)) {
+                // "Event 2-1" or "Event 3-1-2" or "Event 3-2".
                 // "pendingMessages" has more messages to register new schema.
                 // This operation will not be conflict with another schema 
registration because both operations are
                 // attempt to acquire the same lock "ProducerImpl.this".
                 tryRegisterSchema(cnx, loopEndDueToSchemaRegisterNeeded.msg, 
loopEndDueToSchemaRegisterNeeded.callback,
                         expectedEpoch);
             }
-            // Nothing to do if the event is "Event 3-1-1", just keep stuck.
+            // Schema registration will trigger a new 
"recoverProcessOpSendMsgFrom", so return here. If failed to switch
+            // state, it means another task will trigger a new 
"recoverProcessOpSendMsgFrom".
             return;
         } else if (latestMsgAttemptedRegisteredSchema != null) {
             // Event 2-2 or "Event 3-1-2".

Reply via email to