codelipenghui commented on code in PR #24178:
URL: https://github.com/apache/pulsar/pull/24178#discussion_r2057426985
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java:
##########
@@ -108,6 +108,15 @@ private void ackReceivedReplicatedMsg(ClientCnx cnx,
OpSendMsg op, long sourceLI
if (pendingLId != null && pendingEId != null
&& (pendingLId < lastPersistedSourceLedgerId ||
(pendingLId.longValue() == lastPersistedSourceLedgerId
&& pendingEId.longValue() <= lastPersistedSourceEntryId))) {
+ if
(MessageImpl.SchemaState.Broken.equals(op.msg.getSchemaState())) {
+ log.error("[{}] [{}] Replication is paused because the schema
is incompatible with the remote"
+ + " cluster, please modify the schema
compatibility for the remote cluster."
+ + " Latest published entry {}:{}, Entry who
has broken schema: {}:{},"
Review Comment:
```suggestion
+ " Latest published entry {}:{}, Entry who
has incompatible schema: {}:{},"
```
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/GeoReplicationProducerImpl.java:
##########
@@ -108,6 +108,15 @@ private void ackReceivedReplicatedMsg(ClientCnx cnx,
OpSendMsg op, long sourceLI
if (pendingLId != null && pendingEId != null
&& (pendingLId < lastPersistedSourceLedgerId ||
(pendingLId.longValue() == lastPersistedSourceLedgerId
&& pendingEId.longValue() <= lastPersistedSourceEntryId))) {
+ if
(MessageImpl.SchemaState.Broken.equals(op.msg.getSchemaState())) {
+ log.error("[{}] [{}] Replication is paused because the schema
is incompatible with the remote"
+ + " cluster, please modify the schema
compatibility for the remote cluster."
+ + " Latest published entry {}:{}, Entry who
has broken schema: {}:{},"
Review Comment:
Is it better to add a metric or is there any existing metrics for
replication failures?
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java:
##########
@@ -2399,41 +2422,115 @@ protected void updateLastSeqPushed(OpSendMsg op) {
}
}
- // Must acquire a lock on ProducerImpl.this before calling method.
- private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl from,
long expectedEpoch) {
+ /**
+ * There are following events that will call this method.
+ * 1. Republish messages in {@link #pendingMessages} after a reconnect.
+ * 1-1. Using multiple version producer, and there is a message has new
schema that should be registered.
+ * 1-2. No message should register new schema.
+ * 2. If using multiple version producer, the new schema was registered
successfully.
+ * 2-1. There is another message needs to register new schema,which is
in {@link #pendingMessages}.
+ * 2-2. {@link #pendingMessages} has no other messages that need to
register new schema.
+ * 3. If using multiple version producer, the new schema was failed to
registered.
+ * 3-1. If the new schema is incompatible.
+ * 3-1-1. If {@link
#pauseSendingToPreservePublishOrderOnSchemaRegFailure} is true pause all
following
+ * publishing to avoid out-of-order issue.
+ * 3-1-2. Otherwise, discard the failed message anc continuously
publishing the following messages.
+ * 3-2. The new schema registration failed due to other error, retry
registering.
+ * Note: Since the current method accesses & modifies {@link
#pendingMessages}, you should acquire a lock on
+ * {@link ProducerImpl} before calling method.
+ */
+ private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl
latestMsgAttemptedRegisteredSchema,
+ boolean failedIncompatibleSchema,
long expectedEpoch) {
if (expectedEpoch != this.connectionHandler.getEpoch() || cnx() ==
null) {
// In this case, the cnx passed to this method is no longer the
active connection. This method will get
// called again once the new connection registers the producer
with the broker.
log.info("[{}][{}] Producer epoch mismatch or the current
connection is null. Skip re-sending the "
+ " {} pending messages since they will deliver
using another connection.", topic,
- producerName,
- pendingMessages.messagesCount());
+ producerName, pendingMessages.messagesCount());
return;
}
final boolean stripChecksum = cnx.getRemoteEndpointProtocolVersion() <
brokerChecksumSupportedVersion();
Iterator<OpSendMsg> msgIterator = pendingMessages.iterator();
- OpSendMsg pendingRegisteringOp = null;
+ MessageImpl loopStartAt = latestMsgAttemptedRegisteredSchema;
+ OpSendMsg loopEndDueToSchemaRegisterNeeded = null;
while (msgIterator.hasNext()) {
OpSendMsg op = msgIterator.next();
- if (from != null) {
- if (op.msg == from) {
- from = null;
+ if (loopStartAt != null) {
+ if (op.msg == loopStartAt) {
+ loopStartAt = null;
} else {
continue;
}
}
if (op.msg != null) {
- if (op.msg.getSchemaState() == None) {
- if (!rePopulateMessageSchema(op.msg)) {
- pendingRegisteringOp = op;
+ if (Broken.equals(op.msg.getSchemaState())) {
+ // "Event 1-1" happens after "Event 3-1-1".
+ // Maybe user has changed the schema compatibility
strategy, will retry to register the new schema.
+ if (pauseSendingToPreservePublishOrderOnSchemaRegFailure) {
Review Comment:
If we pause publishing messages for the replicator, and the new messages
still call the producer API to write messages, will if finally fill up the
pending queue?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]