taklwu commented on code in PR #7617:
URL: https://github.com/apache/hbase/pull/7617#discussion_r2718075417


##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java:
##########
@@ -229,6 +246,53 @@ private void shipEdits(WALEntryBatch entryBatch) {
     }
   }
 
+  private boolean shouldPersistLogPosition() {
+    ReplicationEndpoint endpoint = source.getReplicationEndpoint();
+    if (!endpoint.isBufferedReplicationEndpoint()) {
+      // Non-buffering endpoints persist immediately
+      return true;
+    }
+    if (stagedWalSize == 0 || lastShippedBatch == null) {
+      return false;
+    }
+    return stagedWalSize >= endpoint.getMaxBufferSize()
+      || (EnvironmentEdgeManager.currentTime() - lastStagedFlushTs >= 
endpoint.maxFlushInterval());
+  }
+
+  @Nullable
+  // Returns IOException instead of throwing so callers can decide
+  // whether to retry (shipEdits) or best-effort log (run()).
+  private IOException persistLogPosition() {
+    if (lastShippedBatch == null) {
+      return null;
+    }
+
+    ReplicationEndpoint endpoint = source.getReplicationEndpoint();
+
+    if (endpoint.isBufferedReplicationEndpoint() && stagedWalSize > 0) {
+      source.getReplicationEndpoint().beforePersistingReplicationOffset();
+    }
+
+    stagedWalSize = 0;
+    lastStagedFlushTs = EnvironmentEdgeManager.currentTime();
+
+    // Clean up hfile references
+    try {
+      for (Entry entry : entriesForCleanUpHFileRefs) {
+        cleanUpHFileRefs(entry.getEdit());
+        LOG.trace("shipped entry {}: ", entry);
+      }
+    } catch (IOException e) {
+      LOG.warn("{} threw exception while cleaning up hfile refs", 
endpoint.getClass().getName(), e);
+      return e;

Review Comment:
   ```suggestion
         throw e;
   ```



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java:
##########
@@ -98,6 +104,13 @@ public final void run() {
     LOG.info("Running ReplicationSourceShipper Thread for wal group: {}", 
this.walGroupId);
     // Loop until we close down
     while (isActive()) {
+      // check if flush needed for WAL backup, this is need for timeout based 
flush
+      if (shouldPersistLogPosition()) {
+        IOException error = persistLogPosition();
+        if (error != null) {
+          LOG.warn("Exception while persisting replication state", error);
+        }
+      }

Review Comment:
   nit: I don't think you need to change the return type as `IOException`, 
instead you can just use `try-catch` if it's really needed to handle any 
Exception differently. 
   
   ```suggestion
         if (shouldPersistLogPosition()) {
           persistLogPosition();
         }
   ```



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java:
##########
@@ -229,6 +246,53 @@ private void shipEdits(WALEntryBatch entryBatch) {
     }
   }
 
+  private boolean shouldPersistLogPosition() {
+    ReplicationEndpoint endpoint = source.getReplicationEndpoint();
+    if (!endpoint.isBufferedReplicationEndpoint()) {
+      // Non-buffering endpoints persist immediately
+      return true;
+    }
+    if (stagedWalSize == 0 || lastShippedBatch == null) {
+      return false;
+    }
+    return stagedWalSize >= endpoint.getMaxBufferSize()
+      || (EnvironmentEdgeManager.currentTime() - lastStagedFlushTs >= 
endpoint.maxFlushInterval());
+  }
+
+  @Nullable
+  // Returns IOException instead of throwing so callers can decide
+  // whether to retry (shipEdits) or best-effort log (run()).
+  private IOException persistLogPosition() {

Review Comment:
   ```suggestion
     private void persistLogPosition() throws IOException {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to