poorbarcode commented on code in PR #20128:
URL: https://github.com/apache/pulsar/pull/20128#discussion_r1171233767
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java:
##########
@@ -268,6 +275,36 @@ protected void readMoreEntries() {
}
}
+ /**
+ * Determine if the messages are continuous, if they are not, the messages
are discarded and rewind is called.
+ * Why are the messages discontinuous, for example: if there has something
wrong when Replicator is processing
+ * messages "[1:1 ~ 3:3]", Replicator discards the unprocessed message.
But a new batch messages "[4:1 ~ 6:6]"
+ * is received later, then these messages will be sent.
+ */
+ protected boolean isMessageContinuousAndRewindIfNot(List<Entry> entries) {
+ if (CollectionUtils.isEmpty(entries)){
+ return true;
+ }
+
+ PositionImpl markDeletedPos = (PositionImpl)
cursor.getMarkDeletedPosition();
+ PositionImpl lastSentPos = (PositionImpl) lastSent;
+ PositionImpl lastProcessedPos = markDeletedPos.compareTo(lastSentPos)
> 0 ? markDeletedPos : lastSentPos;
+
+ PositionImpl firstMessageReceived = (PositionImpl)
entries.get(0).getPosition();
+ PositionImpl expectedFirstMessage =
managedLedger.getNextValidPosition(lastProcessedPos);
+ if (expectedFirstMessage.compareTo(firstMessageReceived) >= 0) {
+ return true;
+ }
Review Comment:
Good suggestion. Already fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]