lhotari commented on code in PR #24178:
URL: https://github.com/apache/pulsar/pull/24178#discussion_r2044548538


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -199,6 +201,10 @@ public ProducerImpl(PulsarClientImpl client, String topic, 
ProducerConfiguration
         this.userProvidedProducerName = StringUtils.isNotBlank(producerName);
         this.partitionIndex = partitionIndex;
         this.pendingMessages = createPendingMessagesQueue();
+        // Replication need be stuck when a message can not be replicated due 
to failed schema registration. Otherwise,

Review Comment:
   ```suggestion
           // Replication needs to be paused when a message can not be 
replicated due to failed schema registration. Otherwise,
   ```



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -2422,28 +2461,84 @@ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, 
MessageImpl from, long e
         }
         final boolean stripChecksum = cnx.getRemoteEndpointProtocolVersion() < 
brokerChecksumSupportedVersion();
         Iterator<OpSendMsg> msgIterator = pendingMessages.iterator();
-        OpSendMsg pendingRegisteringOp = null;
+        MessageImpl loopStartAt = latestMsgAttemptedRegisteredSchema;
+        OpSendMsg loopEndDueToSchemaRegisterNeeded = null;
         while (msgIterator.hasNext()) {
             OpSendMsg op = msgIterator.next();
-            if (from != null) {
-                if (op.msg == from) {
-                    from = null;
+            if (loopStartAt != null) {
+                if (op.msg == loopStartAt) {
+                    loopStartAt = null;
                 } else {
                     continue;
                 }
             }
             if (op.msg != null) {
-                if (op.msg.getSchemaState() == None) {
-                    if (!rePopulateMessageSchema(op.msg)) {
-                        pendingRegisteringOp = op;
+                if (Broken.equals(op.msg.getSchemaState())) {
+                    // "Event 1-1" happens after "Event 3-1-1".
+                    // Maybe user has changed the schema compatibility 
strategy, will retry to register the new schema.
+                    if (pauseSendingToPreservePublishOrderOnSchemaRegFailure) {
+                        loopEndDueToSchemaRegisterNeeded = op;
                         break;
+                    } else {
+                        // This scenario will never happen because the message 
will be removed from the queue as soon
+                        // as it was set to "schemaState -> Broken".
+                        SchemaInfo msgSchemaInfo = op.msg.hasReplicateFrom() ? 
op.msg.getSchemaInfoForReplicator()
+                                : op.msg.getSchemaInfo();
+                        log.error("[{}] [{}] A message attempts to register 
new schema, but failed. It should be"
+                            + " removed from the pending queue but not, which 
is not expected. {}",
+                            topic, producerName, 
String.valueOf(msgSchemaInfo));
+                        releaseSemaphoreForSendOp(op);
+                        msgIterator.remove();
+                        op.recycle();
+                        continue;
                     }
-                } else if (op.msg.getSchemaState() == Broken) {
-                    op.recycle();
+                } else if (op.msg == latestMsgAttemptedRegisteredSchema && 
failedIncompatibleSchema
+                        && op.msg.getSchemaState() == None) {
+                    op.msg.setSchemaState(Broken);
+                    SchemaInfo msgSchemaInfo = op.msg.hasReplicateFrom() ? 
op.msg.getSchemaInfoForReplicator()
+                            : op.msg.getSchemaInfo();
+                    // Event 3-1-1.
+                    // New schema is incompatible, if users need to guarantee 
the publishing ordering, we should let
+                    // the producer be stuck until user changed the 
compatibility policy and unload the target topic.
+                    if (pauseSendingToPreservePublishOrderOnSchemaRegFailure) {
+                        log.error("[{}] [{}] Producer was stuck due to 
incompatible schem, please adjust your"
+                                + " schema compatibility strategy and unload 
the topic on the target cluster. {}",
+                                topic, producerName, 
String.valueOf(msgSchemaInfo));

Review Comment:
   Please continue to make the comment and log message understandable. Here's 
something that Claude AI suggested when given the context of this PR:
   
   ```suggestion
                       // When a schema is incompatible, we need to pause the 
producer to preserve message order.
                       // Otherwise, subsequent messages with compatible 
schemas would be delivered while this message
                       // remains stuck, causing out-of-order delivery or 
potential message loss with deduplication.                    
                       if 
(pauseSendingToPreservePublishOrderOnSchemaRegFailure) {
                           log.error("[{}] [{}] Publishing paused: message 
schema incompatible with target cluster."
                                   + " To resume publishing: 1) Adjust schema 
compatibility strategy on target cluster"
                                   + " 2) Unload topic on target cluster. 
Schema details: {}",
                                   topic, producerName, 
String.valueOf(msgSchemaInfo));
   ```



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java:
##########
@@ -108,6 +108,15 @@ private void ackReceivedReplicatedMsg(ClientCnx cnx, 
OpSendMsg op, long sourceLI
         if (pendingLId != null && pendingEId != null
                 && (pendingLId < lastPersistedSourceLedgerId || 
(pendingLId.longValue() == lastPersistedSourceLedgerId
                   && pendingEId.longValue() <= lastPersistedSourceEntryId))) {
+            if 
(MessageImpl.SchemaState.Broken.equals(op.msg.getSchemaState())) {
+                log.error("[{}] [{}] Replication is stuck because there is a 
schema is incompatible with the remote"

Review Comment:
   ```suggestion
                   log.error("[{}] [{}] Replication is paused because the 
schema is incompatible with the remote"
   ```



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -2409,8 +2431,25 @@ protected void updateLastSeqPushed(OpSendMsg op) {
         }
     }
 
-    // Must acquire a lock on ProducerImpl.this before calling method.
-    private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl from, 
long expectedEpoch) {
+    /**
+     * There are following events that will call this method.
+     * 1. Republish messages in {@link #pendingMessages} after a reconnect.
+     *   1-1. Using multiple version producer, and there is a message has new 
schema that should be registered.
+     *   1-2. No message should register new schema.
+     * 2. If using multiple version producer, the new schema was registered 
successfully.
+     *   2-1. There is another message needs to register new schema,which is 
in {@link #pendingMessages}.
+     *   2-2. {@link #pendingMessages} has no other messages that need to 
register new schema.
+     * 3. If using multiple version producer, the new schema was failed to 
registered.
+     *   3-1. If the new schema is incompatible.
+     *     3-1-1. If {@link 
#pauseSendingToPreservePublishOrderOnSchemaRegFailure} is true stuck all 
following

Review Comment:
   ```suggestion
        *     3-1-1. If {@link 
#pauseSendingToPreservePublishOrderOnSchemaRegFailure} is true pause all 
following
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to