ankitsinghal commented on a change in pull request #2546:
URL: https://github.com/apache/hbase/pull/2546#discussion_r520360311
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
##########
@@ -325,4 +327,53 @@ void stopWorker() {
public boolean isFinished() {
return state == WorkerState.FINISHED;
}
+
+ /**
+ * Attempts to properly update
<code>ReplicationSourceManager.totalBufferUser</code>,
+ * in case there were unprocessed entries batched by the reader to the
shipper,
+ * but the shipper didn't manage to ship those because the replication
source is being terminated.
+ * In that case, it iterates through the batched entries and decrease the
pending
+ * entries size from <code>ReplicationSourceManager.totalBufferUser</code>
+ * <p/>
+ * <b>NOTES</b>
+ * 1) This method should only be called upon replication source termination.
+ * It blocks waiting for both shipper and reader threads termination,
+ * to make sure no race conditions
+ * when updating <code>ReplicationSourceManager.totalBufferUser</code>.
+ *
+ * 2) It <b>does not</b> attempt to terminate reader and shipper threads.
Those <b>must</b>
+ * have been triggered interruption/termination prior to calling this method.
+ */
+ void clearWALEntryBatch() {
+ long timeout = System.currentTimeMillis() + this.shipEditsTimeout;
+ while(this.isAlive() || this.entryReader.isAlive()){
+ try {
+ if (System.currentTimeMillis() >= timeout) {
+ LOG.warn("Interrupting source thread for peer {} without cleaning
buffer usage "
+ + "because clearWALEntryBatch method timed out whilst waiting
reader/shipper "
+ + "thread to stop.", this.source.getPeerId());
+ Thread.currentThread().interrupt();
+ } else {
+ // Wait both shipper and reader threads to stop
+ Thread.sleep(this.sleepForRetries);
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("{} Interrupted while waiting {} to stop on
clearWALEntryBatch: {}",
+ this.source.getPeerId(), this.getName(), e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ LongAccumulator totalToDecrement = new LongAccumulator((a,b) -> a + b, 0);
+ entryReader.entryBatchQueue.forEach(w -> {
+ entryReader.entryBatchQueue.remove(w);
Review comment:
It is a best practice to avoid Collection.remove(object) while
iterating, to be agnostic to collection's iterator implementation as some can
throw ConcurrentModificationException in such a scenario, instead use
Iterator.remove()
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
##########
@@ -325,4 +327,53 @@ void stopWorker() {
public boolean isFinished() {
return state == WorkerState.FINISHED;
}
+
+ /**
+ * Attempts to properly update
<code>ReplicationSourceManager.totalBufferUser</code>,
+ * in case there were unprocessed entries batched by the reader to the
shipper,
+ * but the shipper didn't manage to ship those because the replication
source is being terminated.
+ * In that case, it iterates through the batched entries and decrease the
pending
+ * entries size from <code>ReplicationSourceManager.totalBufferUser</code>
+ * <p/>
+ * <b>NOTES</b>
+ * 1) This method should only be called upon replication source termination.
+ * It blocks waiting for both shipper and reader threads termination,
+ * to make sure no race conditions
+ * when updating <code>ReplicationSourceManager.totalBufferUser</code>.
+ *
+ * 2) It <b>does not</b> attempt to terminate reader and shipper threads.
Those <b>must</b>
+ * have been triggered interruption/termination prior to calling this method.
+ */
+ void clearWALEntryBatch() {
+ long timeout = System.currentTimeMillis() + this.shipEditsTimeout;
+ while(this.isAlive() || this.entryReader.isAlive()){
+ try {
+ if (System.currentTimeMillis() >= timeout) {
+ LOG.warn("Interrupting source thread for peer {} without cleaning
buffer usage "
+ + "because clearWALEntryBatch method timed out whilst waiting
reader/shipper "
+ + "thread to stop.", this.source.getPeerId());
+ Thread.currentThread().interrupt();
+ } else {
+ // Wait both shipper and reader threads to stop
+ Thread.sleep(this.sleepForRetries);
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("{} Interrupted while waiting {} to stop on
clearWALEntryBatch: {}",
+ this.source.getPeerId(), this.getName(), e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ LongAccumulator totalToDecrement = new LongAccumulator((a,b) -> a + b, 0);
+ entryReader.entryBatchQueue.forEach(w -> {
+ entryReader.entryBatchQueue.remove(w);
+ w.getWalEntries().forEach(e -> {
+ long entrySizeExcludeBulkLoad =
entryReader.getEntrySizeExcludeBulkLoad(e);
+ totalToDecrement.accumulate(entrySizeExcludeBulkLoad);
+ });
+ });
+
+ LOG.trace("Decrementing totalBufferUsed by {}B while stopping Replication
WAL Readers.",
+ totalToDecrement.longValue());
+
source.getSourceManager().getTotalBufferUsed().addAndGet(-totalToDecrement.longValue());
Review comment:
we may also need to update global metric (tracking memory used for these
edits ) here
```
long newBufferUsed =
source.getSourceManager().getTotalBufferUsed().addAndGet(-totalToDecrement.longValue());
source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
```
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
##########
@@ -325,4 +327,53 @@ void stopWorker() {
public boolean isFinished() {
return state == WorkerState.FINISHED;
}
+
+ /**
+ * Attempts to properly update
<code>ReplicationSourceManager.totalBufferUser</code>,
+ * in case there were unprocessed entries batched by the reader to the
shipper,
+ * but the shipper didn't manage to ship those because the replication
source is being terminated.
+ * In that case, it iterates through the batched entries and decrease the
pending
+ * entries size from <code>ReplicationSourceManager.totalBufferUser</code>
+ * <p/>
+ * <b>NOTES</b>
+ * 1) This method should only be called upon replication source termination.
+ * It blocks waiting for both shipper and reader threads termination,
+ * to make sure no race conditions
+ * when updating <code>ReplicationSourceManager.totalBufferUser</code>.
+ *
+ * 2) It <b>does not</b> attempt to terminate reader and shipper threads.
Those <b>must</b>
+ * have been triggered interruption/termination prior to calling this method.
+ */
+ void clearWALEntryBatch() {
+ long timeout = System.currentTimeMillis() + this.shipEditsTimeout;
+ while(this.isAlive() || this.entryReader.isAlive()){
+ try {
+ if (System.currentTimeMillis() >= timeout) {
+ LOG.warn("Interrupting source thread for peer {} without cleaning
buffer usage "
+ + "because clearWALEntryBatch method timed out whilst waiting
reader/shipper "
+ + "thread to stop.", this.source.getPeerId());
+ Thread.currentThread().interrupt();
+ } else {
+ // Wait both shipper and reader threads to stop
+ Thread.sleep(this.sleepForRetries);
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("{} Interrupted while waiting {} to stop on
clearWALEntryBatch: {}",
+ this.source.getPeerId(), this.getName(), e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ LongAccumulator totalToDecrement = new LongAccumulator((a,b) -> a + b, 0);
+ entryReader.entryBatchQueue.forEach(w -> {
+ entryReader.entryBatchQueue.remove(w);
+ w.getWalEntries().forEach(e -> {
+ long entrySizeExcludeBulkLoad =
entryReader.getEntrySizeExcludeBulkLoad(e);
Review comment:
nit: refer Static Method using classname, ReplicationSourceWALReader.
getEntrySizeExcludeBulkLoad()
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
##########
@@ -325,4 +327,53 @@ void stopWorker() {
public boolean isFinished() {
return state == WorkerState.FINISHED;
}
+
+ /**
+ * Attempts to properly update
<code>ReplicationSourceManager.totalBufferUser</code>,
+ * in case there were unprocessed entries batched by the reader to the
shipper,
+ * but the shipper didn't manage to ship those because the replication
source is being terminated.
+ * In that case, it iterates through the batched entries and decrease the
pending
+ * entries size from <code>ReplicationSourceManager.totalBufferUser</code>
+ * <p/>
+ * <b>NOTES</b>
+ * 1) This method should only be called upon replication source termination.
+ * It blocks waiting for both shipper and reader threads termination,
+ * to make sure no race conditions
+ * when updating <code>ReplicationSourceManager.totalBufferUser</code>.
+ *
+ * 2) It <b>does not</b> attempt to terminate reader and shipper threads.
Those <b>must</b>
+ * have been triggered interruption/termination prior to calling this method.
+ */
+ void clearWALEntryBatch() {
+ long timeout = System.currentTimeMillis() + this.shipEditsTimeout;
+ while(this.isAlive() || this.entryReader.isAlive()){
+ try {
+ if (System.currentTimeMillis() >= timeout) {
+ LOG.warn("Interrupting source thread for peer {} without cleaning
buffer usage "
+ + "because clearWALEntryBatch method timed out whilst waiting
reader/shipper "
+ + "thread to stop.", this.source.getPeerId());
+ Thread.currentThread().interrupt();
Review comment:
why do we need additional interrupt here when
ReplicationSource.terminate() is already interrupted the worker thread prior to
clearWALEntryBatch method call?
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
##########
@@ -684,17 +684,17 @@ public void terminate(String reason, Exception cause,
boolean clearMetrics,
Threads.shutdown(initThread, this.sleepForRetries);
}
Collection<ReplicationSourceShipper> workers = workerThreads.values();
- for (ReplicationSourceShipper worker : workers) {
- worker.stopWorker();
- if(worker.entryReader != null) {
- worker.entryReader.setReaderRunning(false);
- }
- }
+
if (this.replicationEndpoint != null) {
this.replicationEndpoint.stop();
}
+
for (ReplicationSourceShipper worker : workers) {
+ worker.stopWorker();
+ if (worker.entryReader != null) {
+ worker.entryReader.setReaderRunning(false);
Review comment:
If a worker is doing some async work when it is asked to stop and can
take time. then I think we should keep the implementation as it was done
before, like ask all to stop at once and then wait. because if no. of workers
gets large due to backlog and someone changes wait time config to 10s of
seconds, then removePeer command/procedure has to wait for a long time (no. of
workers * (sleep time + time for clearWalEntryBatch) ) to terminate the
replication source.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]