poorbarcode commented on code in PR #20128:
URL: https://github.com/apache/pulsar/pull/20128#discussion_r1171247540


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java:
##########
@@ -268,6 +275,36 @@ protected void readMoreEntries() {
         }
     }
 
+    /**
+     * Determine if the messages are continuous, if they are not, the messages 
are discarded and rewind is called.
+     * Why are the messages discontinuous, for example: if there has something 
wrong when Replicator is processing
+     * messages "[1:1 ~ 3:3]", Replicator discards the unprocessed message. 
But a new batch messages "[4:1 ~ 6:6]"
+     * is received later, then these messages will be sent.
+     */
+    protected boolean isMessageContinuousAndRewindIfNot(List<Entry> entries) {
+        if (CollectionUtils.isEmpty(entries)){
+            return true;
+        }
+
+        PositionImpl markDeletedPos = (PositionImpl) 
cursor.getMarkDeletedPosition();
+        PositionImpl lastSentPos = (PositionImpl) lastSent;
+        PositionImpl lastProcessedPos = markDeletedPos.compareTo(lastSentPos) 
> 0 ? markDeletedPos : lastSentPos;
+
+        PositionImpl firstMessageReceived = (PositionImpl) 
entries.get(0).getPosition();
+        PositionImpl expectedFirstMessage = 
managedLedger.getNextValidPosition(lastProcessedPos);
+        if (expectedFirstMessage.compareTo(firstMessageReceived) >= 0) {
+            return true;
+        }

Review Comment:
   To make the logic of `replicateEntries` easier to read, I changed the method 
name to `checkNoMessageSkipped`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to