poorbarcode commented on code in PR #24178:
URL: https://github.com/apache/pulsar/pull/24178#discussion_r2057588702


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -889,8 +897,15 @@ private void tryRegisterSchema(ClientCnx cnx, MessageImpl 
msg, SendCallback call
                 Throwable t = FutureUtil.unwrapCompletionException(ex);
                 log.warn("[{}] [{}] GetOrCreateSchema error", topic, 
producerName, t);
                 if (t instanceof 
PulsarClientException.IncompatibleSchemaException) {
-                    msg.setSchemaState(MessageImpl.SchemaState.Broken);
-                    callback.sendComplete(t, null);

Review Comment:
   Main change 4:  
   
   - The original implementation called `OpSendMsg.callbaclk.sendComplete()` 
here, it triggers a `recycle` of `OpSendMsg.callback`, but the object 
`OpSendMsg` is still in `producer. pendingMessages`, which may cause 
`OpSendMsg.callback` will be concurrently used.



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -2399,41 +2422,115 @@ protected void updateLastSeqPushed(OpSendMsg op) {
         }
     }
 
-    // Must acquire a lock on ProducerImpl.this before calling method.
-    private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl from, 
long expectedEpoch) {
+    /**
+     * There are following events that will call this method.
+     * 1. Republish messages in {@link #pendingMessages} after a reconnect.
+     *   1-1. Using multiple version producer, and there is a message has new 
schema that should be registered.
+     *   1-2. No message should register new schema.
+     * 2. If using multiple version producer, the new schema was registered 
successfully.
+     *   2-1. There is another message needs to register new schema,which is 
in {@link #pendingMessages}.
+     *   2-2. {@link #pendingMessages} has no other messages that need to 
register new schema.
+     * 3. If using multiple version producer, the new schema was failed to 
registered.
+     *   3-1. If the new schema is incompatible.
+     *     3-1-1. If {@link 
#pauseSendingToPreservePublishOrderOnSchemaRegFailure} is true pause all 
following
+     *       publishing to avoid out-of-order issue.
+     *     3-1-2. Otherwise, discard the failed message anc continuously 
publishing the following messages.
+     *   3-2. The new schema registration failed due to other error, retry 
registering.
+     * Note: Since the current method accesses & modifies {@link 
#pendingMessages}, you should acquire a lock on
+     *       {@link ProducerImpl} before calling method.
+     */
+    private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl 
latestMsgAttemptedRegisteredSchema,
+                                             boolean failedIncompatibleSchema, 
long expectedEpoch) {
         if (expectedEpoch != this.connectionHandler.getEpoch() || cnx() == 
null) {
             // In this case, the cnx passed to this method is no longer the 
active connection. This method will get
             // called again once the new connection registers the producer 
with the broker.
             log.info("[{}][{}] Producer epoch mismatch or the current 
connection is null. Skip re-sending the "
                             + " {} pending messages since they will deliver 
using another connection.", topic,
-                    producerName,
-                    pendingMessages.messagesCount());
+                    producerName, pendingMessages.messagesCount());
             return;
         }
         final boolean stripChecksum = cnx.getRemoteEndpointProtocolVersion() < 
brokerChecksumSupportedVersion();
         Iterator<OpSendMsg> msgIterator = pendingMessages.iterator();
-        OpSendMsg pendingRegisteringOp = null;
+        MessageImpl loopStartAt = latestMsgAttemptedRegisteredSchema;
+        OpSendMsg loopEndDueToSchemaRegisterNeeded = null;
         while (msgIterator.hasNext()) {
             OpSendMsg op = msgIterator.next();
-            if (from != null) {
-                if (op.msg == from) {
-                    from = null;
+            if (loopStartAt != null) {
+                if (op.msg == loopStartAt) {
+                    loopStartAt = null;
                 } else {
                     continue;
                 }
             }
             if (op.msg != null) {
-                if (op.msg.getSchemaState() == None) {
-                    if (!rePopulateMessageSchema(op.msg)) {
-                        pendingRegisteringOp = op;
+                if (Broken.equals(op.msg.getSchemaState())) {
+                    // "Event 1-1" happens after "Event 3-1-1".
+                    // Maybe user has changed the schema compatibility 
strategy, will retry to register the new schema.
+                    if (pauseSendingToPreservePublishOrderOnSchemaRegFailure) {
+                        loopEndDueToSchemaRegisterNeeded = op;
                         break;
+                    } else {
+                        // This scenario will never happen because the message 
will be removed from the queue as soon
+                        // as it was set to "schemaState -> Broken".
+                        SchemaInfo msgSchemaInfo = op.msg.hasReplicateFrom() ? 
op.msg.getSchemaInfoForReplicator()
+                                : op.msg.getSchemaInfo();
+                        log.error("[{}] [{}] A message attempts to register 
new schema, but failed. It should be"
+                            + " removed from the pending queue but not, which 
is not expected. {}",
+                            topic, producerName, 
SchemaUtils.jsonifySchemaInfo(msgSchemaInfo, false));
+                        releaseSemaphoreForSendOp(op);
+                        msgIterator.remove();
+                        op.recycle();
+                        continue;
                     }
-                } else if (op.msg.getSchemaState() == Broken) {
-                    op.recycle();
+                } else if (op.msg == latestMsgAttemptedRegisteredSchema && 
failedIncompatibleSchema
+                        && op.msg.getSchemaState() == None) {
+                    op.msg.setSchemaState(Broken);
+                    SchemaInfo msgSchemaInfo = op.msg.hasReplicateFrom() ? 
op.msg.getSchemaInfoForReplicator()
+                            : op.msg.getSchemaInfo();
+                    // Event 3-1-1.
+                    // When a schema is incompatible, we need to pause the 
producer to preserve message order.
+                    // Otherwise, subsequent messages with compatible schemas 
would be delivered while this message
+                    // remains stuck, causing out-of-order delivery or 
potential message loss with deduplication.
+                    if (pauseSendingToPreservePublishOrderOnSchemaRegFailure) {
+                        log.error("[{}] [{}] Publishing paused: message schema 
incompatible with target cluster."
+                                + " To resume publishing: 1) Adjust schema 
compatibility strategy on target cluster"
+                                + " 2) Unload topic on target cluster. Schema 
details: {}",
+                                topic, producerName, 
SchemaUtils.jsonifySchemaInfo(msgSchemaInfo, false));
+                        loopEndDueToSchemaRegisterNeeded = op;
+                        break;
+                    }
+                    // Event 3-1-2.
+                    // Give user a failed callback and remove the message from 
"pendingMessages".
+                    String failedMsg = format("[%s] [%s] incompatible schema 
%s", topic, producerName,
+                            String.valueOf(msgSchemaInfo));
+                    log.error(failedMsg);
+                    // The messages' release rely on "op.cmd"'s release, we 
need to initialize "op.cmd" and
+                    // release it to release "msg.payload".
+                    if (op.cmd == null) {
+                        op.rePopulate.run();
+                    }
+                    ReferenceCountUtil.safeRelease(op.cmd);
+                    try {
+                        // Need to protect ourselves from any exception being 
thrown in the future handler from the
+                        // application
+                        op.sendComplete(new 
IncompatibleSchemaException(failedMsg));
+                    } catch (Throwable t) {
+                        log.warn("Got exception while completing the failed 
publishing: {}", failedMsg, t);
+                    }
+                    releaseSemaphoreForSendOp(op);
                     msgIterator.remove();

Review Comment:
   Main change 1:
   
   The original implementation just removed `OpSendMsg` that contained a broken 
schema, but never released `OpSendMsg.cmd` which causes a memory leak



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to