taklwu commented on code in PR #7617:
URL: https://github.com/apache/hbase/pull/7617#discussion_r2684251451


##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java:
##########
@@ -283,4 +283,29 @@ public int getTimeout() {
    * @throws IllegalStateException if this service's state isn't FAILED.
    */
   Throwable failureCause();
+
+  // WAL entries are buffered in ContinuousBackupReplicationEndpoint before 
flushing to WAL backup
+  // file. So we return config value CONF_BACKUP_MAX_WAL_SIZE for
+  // ContinuousBackupReplicationEndpoint
+  // and -1 for other ReplicationEndpoint since they don't buffer.
+  // For other ReplicationEndpoint, everytime a WALEntryBatch is shipped, we 
update replication
+  // offset. Please check ReplicationSourceShipper#shouldFlushStagedWal()
+  default long getMaxBufferSize() {
+    return -1;
+  }
+
+  // WAL entries are buffered in ContinuousBackupReplicationEndpoint before 
flushing to WAL backup

Review Comment:
   nit: `ContinuousBackupReplicationEndpoint` is part of 
https://github.com/apache/hbase/pull/7591/ and it's yet committing to master, 
should we mention early in this change? 



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java:
##########
@@ -229,6 +239,41 @@ private void shipEdits(WALEntryBatch entryBatch) {
     }
   }
 
+  private boolean shouldPersistLogPosition() {
+    if (stagedWalSize == 0 || lastShippedBatch == null) {
+      return false;
+    }
+    return (stagedWalSize >= 
source.getReplicationEndpoint().getMaxBufferSize())
+      || (EnvironmentEdgeManager.currentTime() - lastStagedFlushTs
+          >= source.getReplicationEndpoint().maxFlushInterval());
+  }
+
+  private void persistLogPosition() {
+    if (lastShippedBatch == null) {
+      return;
+    }
+    if (stagedWalSize > 0) {
+      source.getReplicationEndpoint().beforePersistingReplicationOffset();
+    }
+    stagedWalSize = 0;
+    lastStagedFlushTs = EnvironmentEdgeManager.currentTime();
+
+    // Clean up hfile references
+    for (Entry entry : entriesForCleanUpHFileRefs) {
+      try {
+        cleanUpHFileRefs(entry.getEdit());
+      } catch (IOException e) {
+        LOG.warn("{} threw unknown exception:",
+          source.getReplicationEndpoint().getClass().getName(), e);
+      }

Review Comment:
   nit: will this be a behavior change because previously when 
`cleanUpHFileRefs` failed, it's throwing thru the function but here we're 
logging it only.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to