Apache9 commented on code in PR #7617:
URL: https://github.com/apache/hbase/pull/7617#discussion_r2719322657


##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java:
##########
@@ -283,4 +283,35 @@ public int getTimeout() {
    * @throws IllegalStateException if this service's state isn't FAILED.
    */
   Throwable failureCause();
+
+  /**
+   * @return true if this endpoint buffers WAL entries and requires explicit 
flush control before
+   *         persisting replication offsets.
+   */
+  default boolean isBufferedReplicationEndpoint() {

Review Comment:
   For me, I do not think we need to expose this information to shipper?
   
   The design here is that, when using different ReplicationEndpoint, you need 
to tune the shipper configuration by your own, as the parameters are not only 
affected by ReplicationEndpoint, they also depend on the shipper side.
   
   For example, when you want to reduce the pressure on recording the offset, 
you should increase the record interval, i.e, increase batch size, increase the 
number of ship times between recording offset, etc. And if you want to reduce 
the pressure on memory and the target receiver, you should decrease the batch 
size, and for S3 based replication endpoint, there is also a trade off, if you 
increase the flush interval, you can get better performance and less files on 
S3, but failover will be more complicated as you need to start from long before.
   
   So, this should be in the documentation, just exposing some configuration 
from ReplicationEndpoint can not handle all the above situations.



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java:
##########
@@ -98,6 +104,13 @@ public final void run() {
     LOG.info("Running ReplicationSourceShipper Thread for wal group: {}", 
this.walGroupId);
     // Loop until we close down
     while (isActive()) {
+      // check if flush needed for WAL backup, this is need for timeout based 
flush

Review Comment:
   This is not designed for WAL backup only, I need to say. Here, in shipper, 
we just follow the configured limit, i.e, time based or size based, to 
determine whether we need to record the log position, and there is a callback 
to ReplicationEndpoint before recording, the replication endpoint can use this 
callback to do something, the shipper does not know the detail.



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java:
##########
@@ -229,6 +246,53 @@ private void shipEdits(WALEntryBatch entryBatch) {
     }
   }
 
+  private boolean shouldPersistLogPosition() {
+    ReplicationEndpoint endpoint = source.getReplicationEndpoint();
+    if (!endpoint.isBufferedReplicationEndpoint()) {
+      // Non-buffering endpoints persist immediately
+      return true;
+    }
+    if (stagedWalSize == 0 || lastShippedBatch == null) {
+      return false;
+    }
+    return stagedWalSize >= endpoint.getMaxBufferSize()
+      || (EnvironmentEdgeManager.currentTime() - lastStagedFlushTs >= 
endpoint.maxFlushInterval());
+  }
+
+  @Nullable
+  // Returns IOException instead of throwing so callers can decide
+  // whether to retry (shipEdits) or best-effort log (run()).
+  private IOException persistLogPosition() {

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to