ankitsinghal commented on a change in pull request #2546:
URL: https://github.com/apache/hbase/pull/2546#discussion_r520999059
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
##########
@@ -684,17 +684,17 @@ public void terminate(String reason, Exception cause,
boolean clearMetrics,
Threads.shutdown(initThread, this.sleepForRetries);
}
Collection<ReplicationSourceShipper> workers = workerThreads.values();
- for (ReplicationSourceShipper worker : workers) {
- worker.stopWorker();
- if(worker.entryReader != null) {
- worker.entryReader.setReaderRunning(false);
- }
- }
+
if (this.replicationEndpoint != null) {
this.replicationEndpoint.stop();
}
+
for (ReplicationSourceShipper worker : workers) {
+ worker.stopWorker();
+ if (worker.entryReader != null) {
+ worker.entryReader.setReaderRunning(false);
Review comment:
sure, let me try to explain again.
I was referring to restore this loop.
```
for (ReplicationSourceShipper worker : workers) {
worker.stopWorker();
if(worker.entryReader != null) {
worker.entryReader.setReaderRunning(false);
}
}
```
As your current flow is stopping the worker in a linear manner:-
* Stop a worker
* wait for the worker thread to complete.
* stop another worker
* wait for it finishes
* continue for others......
So in the worst case, you would have to wait for the number of workers *
min(time taken by the worker to finish, timeout)
though by restoring the old loop, you are parallelizing the stopping of the
workers.
* ask all worker threads to finish their work by setting their state.
* then in the second loop, wait for each worker to finish, while you are
waiting for 1 worker, others are also completing their work in parallel.
* so when you are done with one worker it is possible that all other workers
are also done.
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
##########
@@ -325,4 +327,53 @@ void stopWorker() {
public boolean isFinished() {
return state == WorkerState.FINISHED;
}
+
+ /**
+ * Attempts to properly update
<code>ReplicationSourceManager.totalBufferUser</code>,
+ * in case there were unprocessed entries batched by the reader to the
shipper,
+ * but the shipper didn't manage to ship those because the replication
source is being terminated.
+ * In that case, it iterates through the batched entries and decrease the
pending
+ * entries size from <code>ReplicationSourceManager.totalBufferUser</code>
+ * <p/>
+ * <b>NOTES</b>
+ * 1) This method should only be called upon replication source termination.
+ * It blocks waiting for both shipper and reader threads termination,
+ * to make sure no race conditions
+ * when updating <code>ReplicationSourceManager.totalBufferUser</code>.
+ *
+ * 2) It <b>does not</b> attempt to terminate reader and shipper threads.
Those <b>must</b>
+ * have been triggered interruption/termination prior to calling this method.
+ */
+ void clearWALEntryBatch() {
+ long timeout = System.currentTimeMillis() + this.shipEditsTimeout;
+ while(this.isAlive() || this.entryReader.isAlive()){
+ try {
+ if (System.currentTimeMillis() >= timeout) {
+ LOG.warn("Interrupting source thread for peer {} without cleaning
buffer usage "
+ + "because clearWALEntryBatch method timed out whilst waiting
reader/shipper "
+ + "thread to stop.", this.source.getPeerId());
+ Thread.currentThread().interrupt();
Review comment:
> * 1) This method should only be called upon replication source
termination.
so what this interrupt will do, how is it handled in the source?
```
LOG.warn("Interrupting source thread for peer {} without cleaning buffer
usage "
+ "because clearWALEntryBatch method timed out whilst waiting
reader/shipper "
+ "thread to stop.", this.source.getPeerId());
```
don't we need to return here as we timed out and not clearing the batch?
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
##########
@@ -325,4 +327,53 @@ void stopWorker() {
public boolean isFinished() {
return state == WorkerState.FINISHED;
}
+
+ /**
+ * Attempts to properly update
<code>ReplicationSourceManager.totalBufferUser</code>,
+ * in case there were unprocessed entries batched by the reader to the
shipper,
+ * but the shipper didn't manage to ship those because the replication
source is being terminated.
+ * In that case, it iterates through the batched entries and decrease the
pending
+ * entries size from <code>ReplicationSourceManager.totalBufferUser</code>
+ * <p/>
+ * <b>NOTES</b>
+ * 1) This method should only be called upon replication source termination.
+ * It blocks waiting for both shipper and reader threads termination,
+ * to make sure no race conditions
+ * when updating <code>ReplicationSourceManager.totalBufferUser</code>.
+ *
+ * 2) It <b>does not</b> attempt to terminate reader and shipper threads.
Those <b>must</b>
+ * have been triggered interruption/termination prior to calling this method.
+ */
+ void clearWALEntryBatch() {
+ long timeout = System.currentTimeMillis() + this.shipEditsTimeout;
+ while(this.isAlive() || this.entryReader.isAlive()){
+ try {
+ if (System.currentTimeMillis() >= timeout) {
+ LOG.warn("Interrupting source thread for peer {} without cleaning
buffer usage "
+ + "because clearWALEntryBatch method timed out whilst waiting
reader/shipper "
+ + "thread to stop.", this.source.getPeerId());
+ Thread.currentThread().interrupt();
+ } else {
+ // Wait both shipper and reader threads to stop
+ Thread.sleep(this.sleepForRetries);
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("{} Interrupted while waiting {} to stop on
clearWALEntryBatch: {}",
+ this.source.getPeerId(), this.getName(), e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ LongAccumulator totalToDecrement = new LongAccumulator((a,b) -> a + b, 0);
+ entryReader.entryBatchQueue.forEach(w -> {
+ entryReader.entryBatchQueue.remove(w);
Review comment:
As BlockingQueue is also a collection and it depends upon whether the
Iterator(used by foreach) in the implementation of the blocking queue allows
structural change while iterating or not.
It seems BlockingQueue implementation has only weakly consistent
iterators(and doesn't throw ConcurrentModificationException) and guaranteed to
return all elements as they existed during iterator creation , so we should be
fine here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]