lhotari commented on code in PR #24178:
URL: https://github.com/apache/pulsar/pull/24178#discussion_r2042128715


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java:
##########
@@ -108,6 +108,15 @@ private void ackReceivedReplicatedMsg(ClientCnx cnx, 
OpSendMsg op, long sourceLI
         if (pendingLId != null && pendingEId != null
                 && (pendingLId < lastPersistedSourceLedgerId || 
(pendingLId.longValue() == lastPersistedSourceLedgerId
                   && pendingEId.longValue() <= lastPersistedSourceEntryId))) {
+            if 
(MessageImpl.SchemaState.Broken.equals(op.msg.getSchemaState())) {
+                log.error("[{}] [{}] Replication due to incompatible schem and 
the replicator will be stuck by producer"
+                                + " queue size limitation."
+                                + " Latest published entry {}:{}, Entry who 
has broken schema: {}:{},"
+                                + " latest persisted source entry: {}:{}, 
pending queue size: {}.",

Review Comment:
   Would it be possible to improve the clarity of this error message? 
   
   * "Replication due to incompatible schem" is not very clear.
     * What does that mean exactly?
   * " and the replicator will be stuck by producer queue size limitation" 
     * Is there a way to make this more understandable and possibly 
[actionable](https://developers.google.com/tech-writing/error-messages/show-fix)?
   
   



##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -199,6 +201,10 @@ public ProducerImpl(PulsarClientImpl client, String topic, 
ProducerConfiguration
         this.userProvidedProducerName = StringUtils.isNotBlank(producerName);
         this.partitionIndex = partitionIndex;
         this.pendingMessages = createPendingMessagesQueue();
+        // Replication need be stuck when a message can not be replicated due 
to failed schema registration. Otherwise,
+        // it may cause an out-of-order issue, and it may lead to a messages 
lost issue if users enabled deduplication
+        // on the remote side.
+        this.needStuckIfSchemaIncompatible = conf.isReplProducer();

Review Comment:
   the field name `needStuckIfSchemaIncompatible` is hard to understand. Would 
it be possible to rename it? 
   The comment "Replication need be stuck when a message can not be replicated 
due to failed schema registration." doesn't currently make it much easier to 
understand. Perhaps rewording the sentence would help.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to