wchevreuil commented on a change in pull request #2191:
URL: https://github.com/apache/hbase/pull/2191#discussion_r484383560



##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
##########
@@ -324,4 +326,39 @@ void stopWorker() {
   public boolean isFinished() {
     return state == WorkerState.FINISHED;
   }
+
+  /**
+   * Attempts to properly update 
<code>ReplicationSourceManager.totalBufferUser</code>,
+   * in case there were unprocessed entries batched by the reader to the 
shipper,
+   * but the shipper didn't manage to ship those because the replication 
source is being terminated.
+   * In that case, it iterates through the batched entries and decrease the 
pending
+   * entries size from <code>ReplicationSourceManager.totalBufferUser</code>
+   * <p/>
+   * <b>NOTE</b> This method should be only called upon replication source 
termination.
+   * It blocks waiting for both shipper and reader threads termination,
+   * to make sure no race conditions
+   * when updating <code>ReplicationSourceManager.totalBufferUser</code>.
+   */
+  void clearWALEntryBatch() {
+    while(this.isAlive() || this.entryReader.isAlive()){
+      try {
+        // Wait both shipper and reader threads to stop
+        Thread.sleep(this.sleepForRetries);
+      } catch (InterruptedException e) {
+        LOG.info("{} Interrupted while waiting {} to stop on 
clearWALEntryBatch",
+          this.source.getPeerId(), this.getName());
+        Thread.currentThread().interrupt();
+      }
+    }

Review comment:
       That's a good point, we don't timeout these locks anywhere, AFICS. Let 
me put one here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to