codelipenghui commented on code in PR #20128:
URL: https://github.com/apache/pulsar/pull/20128#discussion_r1171218012


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java:
##########
@@ -268,6 +275,36 @@ protected void readMoreEntries() {
         }
     }
 
+    /**
+     * Determine if the messages are continuous, if they are not, the messages 
are discarded and rewind is called.
+     * Why are the messages discontinuous, for example: if there has something 
wrong when Replicator is processing
+     * messages "[1:1 ~ 3:3]", Replicator discards the unprocessed message. 
But a new batch messages "[4:1 ~ 6:6]"
+     * is received later, then these messages will be sent.
+     */
+    protected boolean isMessageContinuousAndRewindIfNot(List<Entry> entries) {

Review Comment:
   ```suggestion
       protected boolean hasMessageSkipped(List<Entry> entries) {
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java:
##########
@@ -268,6 +275,36 @@ protected void readMoreEntries() {
         }
     }
 
+    /**
+     * Determine if the messages are continuous, if they are not, the messages 
are discarded and rewind is called.
+     * Why are the messages discontinuous, for example: if there has something 
wrong when Replicator is processing
+     * messages "[1:1 ~ 3:3]", Replicator discards the unprocessed message. 
But a new batch messages "[4:1 ~ 6:6]"
+     * is received later, then these messages will be sent.
+     */
+    protected boolean isMessageContinuousAndRewindIfNot(List<Entry> entries) {
+        if (CollectionUtils.isEmpty(entries)){
+            return true;
+        }
+
+        PositionImpl markDeletedPos = (PositionImpl) 
cursor.getMarkDeletedPosition();
+        PositionImpl lastSentPos = (PositionImpl) lastSent;
+        PositionImpl lastProcessedPos = markDeletedPos.compareTo(lastSentPos) 
> 0 ? markDeletedPos : lastSentPos;
+
+        PositionImpl firstMessageReceived = (PositionImpl) 
entries.get(0).getPosition();
+        PositionImpl expectedFirstMessage = 
managedLedger.getNextValidPosition(lastProcessedPos);
+        if (expectedFirstMessage.compareTo(firstMessageReceived) >= 0) {
+            return true;
+        }
+
+        log.warn("[{}] discontinuous messages was aborted. first message 
received: {}, expected first message: {}",

Review Comment:
   ```suggestion
           log.warn("[{}] Detected messages are skipped. first received: {}, 
last processed: {}",
   ```



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java:
##########
@@ -268,6 +275,36 @@ protected void readMoreEntries() {
         }
     }
 
+    /**
+     * Determine if the messages are continuous, if they are not, the messages 
are discarded and rewind is called.
+     * Why are the messages discontinuous, for example: if there has something 
wrong when Replicator is processing
+     * messages "[1:1 ~ 3:3]", Replicator discards the unprocessed message. 
But a new batch messages "[4:1 ~ 6:6]"
+     * is received later, then these messages will be sent.
+     */
+    protected boolean isMessageContinuousAndRewindIfNot(List<Entry> entries) {
+        if (CollectionUtils.isEmpty(entries)){
+            return true;
+        }
+
+        PositionImpl markDeletedPos = (PositionImpl) 
cursor.getMarkDeletedPosition();
+        PositionImpl lastSentPos = (PositionImpl) lastSent;
+        PositionImpl lastProcessedPos = markDeletedPos.compareTo(lastSentPos) 
> 0 ? markDeletedPos : lastSentPos;
+
+        PositionImpl firstMessageReceived = (PositionImpl) 
entries.get(0).getPosition();
+        PositionImpl expectedFirstMessage = 
managedLedger.getNextValidPosition(lastProcessedPos);
+        if (expectedFirstMessage.compareTo(firstMessageReceived) >= 0) {
+            return true;
+        }

Review Comment:
   It's better to change the method name to `hasMessageSkipped ` because we 
allow duplicated messages.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ShadowReplicator.java:
##########
@@ -113,6 +117,7 @@ protected boolean replicateEntries(List<Entry> entries) {
 
                 // Increment pending messages for messages produced locally
                 PENDING_MESSAGES_UPDATER.incrementAndGet(this);
+                lastSent = entry.getPosition();

Review Comment:
   Same as the comment in GeoReplicator.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/GeoPersistentReplicator.java:
##########
@@ -183,6 +188,7 @@ protected boolean replicateEntries(List<Entry> entries) {
                     msg.getMessageBuilder().clearTxnidLeastBits();
                     // Increment pending messages for messages produced locally
                     PENDING_MESSAGES_UPDATER.incrementAndGet(this);
+                    lastSent = entry.getPosition();

Review Comment:
   The `lastSent` should be updated after the producer sends it successfully.
   We are using the async API to send messages.
   It looks like the inflight messages will also be treated as `lastSent`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to