This is an automated email from the ASF dual-hosted git repository.
xiangying pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new df2c619b14d [fix][client] Fix producer publishing getting stuck after
message with incompatible schema is discarded (#24282)
df2c619b14d is described below
commit df2c619b14d277d8df82ce9a0ab1ab146cf51b7f
Author: fengyubiao <[email protected]>
AuthorDate: Mon May 12 19:15:25 2025 +0800
[fix][client] Fix producer publishing getting stuck after message with
incompatible schema is discarded (#24282)
Fixes #24262
Main Issue: #24262
### Motivation
The issue is a regression of https://github.com/apache/pulsar/pull/24178.
The flow of the issue occurring is as follows
- Publish msg 1, which has an incompatible schema
- Publish msg 2, which has an incompatible schema
- The first message's schema failed to register because it is incompatible
- The message was discarded.
- Issue: it did not trigger a schema registration of the following
messages
### Modifications
Fix the issue
---
.../java/org/apache/pulsar/schema/SchemaTest.java | 20 +++++++++++++++-----
.../org/apache/pulsar/client/impl/ProducerImpl.java | 15 +++++++++++----
2 files changed, 26 insertions(+), 9 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
index 711e8ba5ad7..07c626a549d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/schema/SchemaTest.java
@@ -1528,8 +1528,7 @@ public class SchemaTest extends
MockedPulsarServiceBaseTest {
producer.close();
}
- // This test fails consistently, disabling until it is fixed. Issue
https://github.com/apache/pulsar/issues/24262
- @Test(enabled = false)
+ @Test
public void testPendingQueueSizeIfIncompatible() throws Exception {
final String namespace = BrokerTestUtil.newUniqueName(PUBLIC_TENANT +
"/ns");
admin.namespaces().createNamespace(namespace,
Sets.newHashSet(CLUSTER_NAME));
@@ -1538,17 +1537,28 @@ public class SchemaTest extends
MockedPulsarServiceBaseTest {
admin.topics().createNonPartitionedTopic(topic);
ProducerImpl producer = (ProducerImpl)
pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES())
-
.maxPendingMessages(50).enableBatching(false).topic(topic).create();
- producer.newMessage(Schema.STRING).value("msg").sendAsync();
+
.maxPendingMessages(1000).enableBatching(false).topic(topic).create();
+ producer.newMessage(Schema.STRING).value("msg-1").sendAsync();
AtomicReference<CompletableFuture<MessageId>> latestSend = new
AtomicReference<>();
for (int i = 0; i < 100; i++) {
-
latestSend.set(producer.newMessage(Schema.BOOL).value(false).sendAsync());
+ final String msg = "msg-with-broken-schema-" + i;
+
latestSend.set(producer.newMessage(Schema.BOOL).value(false).sendAsync().thenApply(v
-> {
+ log.info("send complete {}", msg);
+ return null;
+ }).exceptionally(ex -> {
+ log.error("failed to send {}", msg, ex);
+ return null;
+ }));
}
+ // Verify: msgs with broken schema will be discarded.
Awaitility.await().untilAsserted(() -> {
assertTrue(latestSend.get().isDone());
assertEquals(producer.getPendingQueueSize(), 0);
});
+ // Verify: msgs with compatible schema can be sent successfully.
+ producer.newMessage(Schema.STRING).value("msg-2").sendAsync();
+
// cleanup.
producer.close();
admin.topics().delete(topic, false);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index 21b79da9d5a..9960af6046a 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -2464,6 +2464,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
* 3-1-1. If {@link
#pauseSendingToPreservePublishOrderOnSchemaRegFailure} is true pause all
following
* publishing to avoid out-of-order issue.
* 3-1-2. Otherwise, discard the failed message anc continuously
publishing the following messages.
+ * Additionally, the following messages may need schema
registration also.
* 3-2. The new schema registration failed due to other error, retry
registering.
* Note: Since the current method accesses & modifies {@link
#pendingMessages}, you should acquire a lock on
* {@link ProducerImpl} before calling method.
@@ -2482,6 +2483,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
Iterator<OpSendMsg> msgIterator = pendingMessages.iterator();
MessageImpl loopStartAt = latestMsgAttemptedRegisteredSchema;
OpSendMsg loopEndDueToSchemaRegisterNeeded = null;
+ boolean pausedSendingToPreservePublishOrderOnSchemaRegFailure = false;
while (msgIterator.hasNext()) {
OpSendMsg op = msgIterator.next();
if (loopStartAt != null) {
@@ -2526,6 +2528,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
+ " 2) Unload topic on target cluster. Schema
details: {}",
topic, producerName,
SchemaUtils.jsonifySchemaInfo(msgSchemaInfo, false));
loopEndDueToSchemaRegisterNeeded = op;
+ pausedSendingToPreservePublishOrderOnSchemaRegFailure
= true;
break;
}
// Event 3-1-2.
@@ -2581,7 +2584,7 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
}
cnx.ctx().flush();
- // "Event 1-1" or "Event 3-1-1" or "Event 3-2".
+ // "Event 1-1" or "Event 3-1-1" or "Event 3-1-2" or "Event 3-2".
if (loopEndDueToSchemaRegisterNeeded != null) {
if (compareAndSetState(State.Connecting, State.Ready)) {
// "Event 1-1" happens after "Event 3-1-1".
@@ -2589,15 +2592,19 @@ public class ProducerImpl<T> extends ProducerBase<T>
implements TimerTask, Conne
// after users changed the compatibility strategy to make the
schema is compatible.
tryRegisterSchema(cnx, loopEndDueToSchemaRegisterNeeded.msg,
loopEndDueToSchemaRegisterNeeded.callback,
expectedEpoch);
- } else if (!failedIncompatibleSchema &&
compareAndSetState(State.RegisteringSchema, State.Ready)) {
- // "Event 2-1" or "Event 3-2".
+ } else if (pausedSendingToPreservePublishOrderOnSchemaRegFailure) {
+ // Nothing to do if the event is "Event 3-1-1", just keep
stuck.
+ return;
+ } else if (compareAndSetState(State.RegisteringSchema,
State.Ready)) {
+ // "Event 2-1" or "Event 3-1-2" or "Event 3-2".
// "pendingMessages" has more messages to register new schema.
// This operation will not be conflict with another schema
registration because both operations are
// attempt to acquire the same lock "ProducerImpl.this".
tryRegisterSchema(cnx, loopEndDueToSchemaRegisterNeeded.msg,
loopEndDueToSchemaRegisterNeeded.callback,
expectedEpoch);
}
- // Nothing to do if the event is "Event 3-1-1", just keep stuck.
+ // Schema registration will trigger a new
"recoverProcessOpSendMsgFrom", so return here. If failed to switch
+ // state, it means another task will trigger a new
"recoverProcessOpSendMsgFrom".
return;
} else if (latestMsgAttemptedRegisteredSchema != null) {
// Event 2-2 or "Event 3-1-2".