wchevreuil commented on a change in pull request #2191:
URL: https://github.com/apache/hbase/pull/2191#discussion_r484370875



##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
##########
@@ -324,4 +326,39 @@ void stopWorker() {
   public boolean isFinished() {
     return state == WorkerState.FINISHED;
   }
+
+  /**
+   * Attempts to properly update 
<code>ReplicationSourceManager.totalBufferUser</code>,
+   * in case there were unprocessed entries batched by the reader to the 
shipper,
+   * but the shipper didn't manage to ship those because the replication 
source is being terminated.
+   * In that case, it iterates through the batched entries and decrease the 
pending
+   * entries size from <code>ReplicationSourceManager.totalBufferUser</code>
+   * <p/>
+   * <b>NOTE</b> This method should be only called upon replication source 
termination.
+   * It blocks waiting for both shipper and reader threads termination,
+   * to make sure no race conditions
+   * when updating <code>ReplicationSourceManager.totalBufferUser</code>.
+   */
+  void clearWALEntryBatch() {
+    while(this.isAlive() || this.entryReader.isAlive()){
+      try {
+        // Wait both shipper and reader threads to stop
+        Thread.sleep(this.sleepForRetries);
+      } catch (InterruptedException e) {
+        LOG.info("{} Interrupted while waiting {} to stop on 
clearWALEntryBatch",

Review comment:
       > is info the right level here? maybe it is? 
   
   We should probably raise it to a warn here, because that meant the source 
termination didn't complete properly and we might not have "cleared" this 
source pending entries size from source manager buffer.
   
   > but if we get interrupted that means we could go to do the update below in 
a racy way with the other threads right?
   
   I don't think it would be the case. We may have multiple shippers for a 
source, but we access this method in the isolated context of the source thread. 
On the case we are terminating multiple sources (due to recovered queues or 
multiple peers), we are still calculating entries size to be decreased in the 
context of each source thread. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to