ankitsinghal commented on a change in pull request #2546:
URL: https://github.com/apache/hbase/pull/2546#discussion_r520999059



##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
##########
@@ -684,17 +684,17 @@ public void terminate(String reason, Exception cause, 
boolean clearMetrics,
       Threads.shutdown(initThread, this.sleepForRetries);
     }
     Collection<ReplicationSourceShipper> workers = workerThreads.values();
-    for (ReplicationSourceShipper worker : workers) {
-      worker.stopWorker();
-      if(worker.entryReader != null) {
-        worker.entryReader.setReaderRunning(false);
-      }
-    }
+
 
     if (this.replicationEndpoint != null) {
       this.replicationEndpoint.stop();
     }
+
     for (ReplicationSourceShipper worker : workers) {
+      worker.stopWorker();
+      if (worker.entryReader != null) {
+        worker.entryReader.setReaderRunning(false);

Review comment:
       sure, let me try to explain again.
   I was referring to restore this loop.
   ```
   for (ReplicationSourceShipper worker : workers) {    
         worker.stopWorker();   
         if(worker.entryReader != null) {       
           worker.entryReader.setReaderRunning(false);  
         }      
       }
   ```
   As your current flow is stopping the worker in a linear manner:-
   * Stop a worker
   * wait for the worker thread to complete.
   * stop another worker 
   * wait for it finishes
   * continue for others......
   So in the worst case, you would have to wait for the number of workers * 
min(time taken by the worker to finish, timeout)
   
   
   though by restoring the old loop, you are parallelizing the stopping of the 
workers.
   * ask all worker threads to finish their work by setting their state.
   *  then in the second loop, wait for each worker to finish, while you are 
waiting for 1 worker, others are also completing their work in parallel.
   * so when you are done with one worker it is possible that all other workers 
are also done.
   
   

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
##########
@@ -325,4 +327,53 @@ void stopWorker() {
   public boolean isFinished() {
     return state == WorkerState.FINISHED;
   }
+
+  /**
+   * Attempts to properly update 
<code>ReplicationSourceManager.totalBufferUser</code>,
+   * in case there were unprocessed entries batched by the reader to the 
shipper,
+   * but the shipper didn't manage to ship those because the replication 
source is being terminated.
+   * In that case, it iterates through the batched entries and decrease the 
pending
+   * entries size from <code>ReplicationSourceManager.totalBufferUser</code>
+   * <p/>
+   * <b>NOTES</b>
+   * 1) This method should only be called upon replication source termination.
+   * It blocks waiting for both shipper and reader threads termination,
+   * to make sure no race conditions
+   * when updating <code>ReplicationSourceManager.totalBufferUser</code>.
+   *
+   * 2) It <b>does not</b> attempt to terminate reader and shipper threads. 
Those <b>must</b>
+   * have been triggered interruption/termination prior to calling this method.
+   */
+  void clearWALEntryBatch() {
+    long timeout = System.currentTimeMillis() + this.shipEditsTimeout;
+    while(this.isAlive() || this.entryReader.isAlive()){
+      try {
+        if (System.currentTimeMillis() >= timeout) {
+          LOG.warn("Interrupting source thread for peer {} without cleaning 
buffer usage "
+            + "because clearWALEntryBatch method timed out whilst waiting 
reader/shipper "
+            + "thread to stop.", this.source.getPeerId());
+          Thread.currentThread().interrupt();

Review comment:
       >   * 1) This method should only be called upon replication source 
termination.
   
   so what this interrupt will do, how is it handled in the source?
   
   ```
   LOG.warn("Interrupting source thread for peer {} without cleaning buffer 
usage "
               + "because clearWALEntryBatch method timed out whilst waiting 
reader/shipper "
               + "thread to stop.", this.source.getPeerId());
   ```
   don't we need to return here as we timed out and not clearing the batch?

##########
File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
##########
@@ -325,4 +327,53 @@ void stopWorker() {
   public boolean isFinished() {
     return state == WorkerState.FINISHED;
   }
+
+  /**
+   * Attempts to properly update 
<code>ReplicationSourceManager.totalBufferUser</code>,
+   * in case there were unprocessed entries batched by the reader to the 
shipper,
+   * but the shipper didn't manage to ship those because the replication 
source is being terminated.
+   * In that case, it iterates through the batched entries and decrease the 
pending
+   * entries size from <code>ReplicationSourceManager.totalBufferUser</code>
+   * <p/>
+   * <b>NOTES</b>
+   * 1) This method should only be called upon replication source termination.
+   * It blocks waiting for both shipper and reader threads termination,
+   * to make sure no race conditions
+   * when updating <code>ReplicationSourceManager.totalBufferUser</code>.
+   *
+   * 2) It <b>does not</b> attempt to terminate reader and shipper threads. 
Those <b>must</b>
+   * have been triggered interruption/termination prior to calling this method.
+   */
+  void clearWALEntryBatch() {
+    long timeout = System.currentTimeMillis() + this.shipEditsTimeout;
+    while(this.isAlive() || this.entryReader.isAlive()){
+      try {
+        if (System.currentTimeMillis() >= timeout) {
+          LOG.warn("Interrupting source thread for peer {} without cleaning 
buffer usage "
+            + "because clearWALEntryBatch method timed out whilst waiting 
reader/shipper "
+            + "thread to stop.", this.source.getPeerId());
+          Thread.currentThread().interrupt();
+        } else {
+          // Wait both shipper and reader threads to stop
+          Thread.sleep(this.sleepForRetries);
+        }
+      } catch (InterruptedException e) {
+        LOG.warn("{} Interrupted while waiting {} to stop on 
clearWALEntryBatch: {}",
+          this.source.getPeerId(), this.getName(), e);
+        Thread.currentThread().interrupt();
+      }
+    }
+    LongAccumulator totalToDecrement = new LongAccumulator((a,b) -> a + b, 0);
+    entryReader.entryBatchQueue.forEach(w -> {
+      entryReader.entryBatchQueue.remove(w);

Review comment:
       As BlockingQueue is also a collection and it depends upon whether the 
Iterator(used by foreach) in the implementation of the blocking queue allows 
structural change while iterating or not. 
   
   It seems BlockingQueue implementation has only weakly consistent 
iterators(and doesn't throw ConcurrentModificationException) and guaranteed to 
return all elements as they existed during iterator creation , so we should be 
fine here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to