joshelser commented on a change in pull request #2191:
URL: https://github.com/apache/hbase/pull/2191#discussion_r487111250
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
##########
@@ -626,14 +626,12 @@ public void terminate(String reason, Exception cause,
boolean clearMetrics, bool
Threads.shutdown(initThread, this.sleepForRetries);
}
Collection<ReplicationSourceShipper> workers = workerThreads.values();
+
for (ReplicationSourceShipper worker : workers) {
worker.stopWorker();
- if(worker.entryReader != null) {
+ if (worker.entryReader != null) {
worker.entryReader.setReaderRunning(false);
}
- }
-
- for (ReplicationSourceShipper worker : workers) {
Review comment:
Semantics of this have changed, but I'm not seeing conversation that
indicates that it was intentional.
Before: we would stop all workers, then wait for them all to be stopped.
Each worker could stop itself concurrently. Now, for each worker, we request a
stop and then wait for it to be stopped, then move on to the next worker.
I don't _think_ this is a big deal, but wanted to call it out.
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
##########
@@ -324,4 +326,51 @@ void stopWorker() {
public boolean isFinished() {
return state == WorkerState.FINISHED;
}
+
+ /**
+ * Attempts to properly update
<code>ReplicationSourceManager.totalBufferUser</code>,
+ * in case there were unprocessed entries batched by the reader to the
shipper,
+ * but the shipper didn't manage to ship those because the replication
source is being terminated.
+ * In that case, it iterates through the batched entries and decrease the
pending
+ * entries size from <code>ReplicationSourceManager.totalBufferUser</code>
+ * <p/>
+ * <b>NOTES</b>
+ * 1) This method should only be called upon replication source termination.
+ * It blocks waiting for both shipper and reader threads termination,
+ * to make sure no race conditions
+ * when updating <code>ReplicationSourceManager.totalBufferUser</code>.
+ *
+ * 2) It <b>does not</b> attempt to terminate reader and shipper threads.
Those <b>must</b>
+ * have been triggered interruption/termination prior to calling this method.
+ */
+ void clearWALEntryBatch() {
+ long timeout = System.currentTimeMillis() + this.shipEditsTimeout;
+ while(this.isAlive() || this.entryReader.isAlive()){
+ try {
+ if (System.currentTimeMillis() >= timeout) {
+ LOG.warn("Interrupting source thread for peer {} without cleaning
buffer usage "
+ + "because clearWALEntryBatch method timed out whilst waiting
reader/shipper "
+ + "thread to stop.", this.source.getPeerId());
+ Thread.currentThread().interrupt();
Review comment:
Could just `break` instead of interrupting this thread.
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
##########
@@ -324,4 +326,51 @@ void stopWorker() {
public boolean isFinished() {
return state == WorkerState.FINISHED;
}
+
+ /**
+ * Attempts to properly update
<code>ReplicationSourceManager.totalBufferUser</code>,
+ * in case there were unprocessed entries batched by the reader to the
shipper,
+ * but the shipper didn't manage to ship those because the replication
source is being terminated.
+ * In that case, it iterates through the batched entries and decrease the
pending
+ * entries size from <code>ReplicationSourceManager.totalBufferUser</code>
+ * <p/>
+ * <b>NOTES</b>
+ * 1) This method should only be called upon replication source termination.
+ * It blocks waiting for both shipper and reader threads termination,
+ * to make sure no race conditions
+ * when updating <code>ReplicationSourceManager.totalBufferUser</code>.
+ *
+ * 2) It <b>does not</b> attempt to terminate reader and shipper threads.
Those <b>must</b>
+ * have been triggered interruption/termination prior to calling this method.
+ */
+ void clearWALEntryBatch() {
+ long timeout = System.currentTimeMillis() + this.shipEditsTimeout;
+ while(this.isAlive() || this.entryReader.isAlive()){
+ try {
+ if (System.currentTimeMillis() >= timeout) {
+ LOG.warn("Interrupting source thread for peer {} without cleaning
buffer usage "
+ + "because clearWALEntryBatch method timed out whilst waiting
reader/shipper "
+ + "thread to stop.", this.source.getPeerId());
+ Thread.currentThread().interrupt();
+ } else {
+ // Wait both shipper and reader threads to stop
+ Thread.sleep(this.sleepForRetries);
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("{} Interrupted while waiting {} to stop on
clearWALEntryBatch: {}",
+ this.source.getPeerId(), this.getName(), e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ LongAccumulator totalToDecrement = new LongAccumulator((a,b) -> a + b, 0);
+ entryReader.entryBatchQueue.forEach(w -> {
+ entryReader.entryBatchQueue.remove(w);
+ w.getWalEntries().forEach(e -> {
+ long entrySizeExcludeBulkLoad =
entryReader.getEntrySizeExcludeBulkLoad(e);
+ totalToDecrement.accumulate(entrySizeExcludeBulkLoad);
+ });
+ });
+
+
source.getSourceManager().getTotalBufferUsed().addAndGet(-totalToDecrement.longValue());
Review comment:
Thanks for updating the decrement logic.
Maybe a TRACE log message to indicate the amount of buffer reclaimed as a
part of shutting down. Sounds like that might be helpful in the future. e.g.
`LOG.trace("Decrementing totalBufferUsed by {}B while stopping Replication
WAL Readers", totalToDecrement.longValue())`
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
##########
@@ -626,14 +626,12 @@ public void terminate(String reason, Exception cause,
boolean clearMetrics, bool
Threads.shutdown(initThread, this.sleepForRetries);
}
Collection<ReplicationSourceShipper> workers = workerThreads.values();
+
for (ReplicationSourceShipper worker : workers) {
worker.stopWorker();
- if(worker.entryReader != null) {
+ if (worker.entryReader != null) {
worker.entryReader.setReaderRunning(false);
}
- }
-
- for (ReplicationSourceShipper worker : workers) {
Review comment:
Semantics of this have changed, but I'm not seeing conversation that
indicates that it was intentional.
Before: we would stop all workers, then wait for them all to be stopped.
Each worker could stop itself concurrently. Now, for each worker, we request a
stop and then wait for it to be stopped, then move on to the next worker.
I don't _think_ this is a big deal, but wanted to call it out.
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
##########
@@ -324,4 +326,51 @@ void stopWorker() {
public boolean isFinished() {
return state == WorkerState.FINISHED;
}
+
+ /**
+ * Attempts to properly update
<code>ReplicationSourceManager.totalBufferUser</code>,
+ * in case there were unprocessed entries batched by the reader to the
shipper,
+ * but the shipper didn't manage to ship those because the replication
source is being terminated.
+ * In that case, it iterates through the batched entries and decrease the
pending
+ * entries size from <code>ReplicationSourceManager.totalBufferUser</code>
+ * <p/>
+ * <b>NOTES</b>
+ * 1) This method should only be called upon replication source termination.
+ * It blocks waiting for both shipper and reader threads termination,
+ * to make sure no race conditions
+ * when updating <code>ReplicationSourceManager.totalBufferUser</code>.
+ *
+ * 2) It <b>does not</b> attempt to terminate reader and shipper threads.
Those <b>must</b>
+ * have been triggered interruption/termination prior to calling this method.
+ */
+ void clearWALEntryBatch() {
+ long timeout = System.currentTimeMillis() + this.shipEditsTimeout;
+ while(this.isAlive() || this.entryReader.isAlive()){
+ try {
+ if (System.currentTimeMillis() >= timeout) {
+ LOG.warn("Interrupting source thread for peer {} without cleaning
buffer usage "
+ + "because clearWALEntryBatch method timed out whilst waiting
reader/shipper "
+ + "thread to stop.", this.source.getPeerId());
+ Thread.currentThread().interrupt();
Review comment:
Could just `break` instead of interrupting this thread.
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
##########
@@ -324,4 +326,51 @@ void stopWorker() {
public boolean isFinished() {
return state == WorkerState.FINISHED;
}
+
+ /**
+ * Attempts to properly update
<code>ReplicationSourceManager.totalBufferUser</code>,
+ * in case there were unprocessed entries batched by the reader to the
shipper,
+ * but the shipper didn't manage to ship those because the replication
source is being terminated.
+ * In that case, it iterates through the batched entries and decrease the
pending
+ * entries size from <code>ReplicationSourceManager.totalBufferUser</code>
+ * <p/>
+ * <b>NOTES</b>
+ * 1) This method should only be called upon replication source termination.
+ * It blocks waiting for both shipper and reader threads termination,
+ * to make sure no race conditions
+ * when updating <code>ReplicationSourceManager.totalBufferUser</code>.
+ *
+ * 2) It <b>does not</b> attempt to terminate reader and shipper threads.
Those <b>must</b>
+ * have been triggered interruption/termination prior to calling this method.
+ */
+ void clearWALEntryBatch() {
+ long timeout = System.currentTimeMillis() + this.shipEditsTimeout;
+ while(this.isAlive() || this.entryReader.isAlive()){
+ try {
+ if (System.currentTimeMillis() >= timeout) {
+ LOG.warn("Interrupting source thread for peer {} without cleaning
buffer usage "
+ + "because clearWALEntryBatch method timed out whilst waiting
reader/shipper "
+ + "thread to stop.", this.source.getPeerId());
+ Thread.currentThread().interrupt();
+ } else {
+ // Wait both shipper and reader threads to stop
+ Thread.sleep(this.sleepForRetries);
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("{} Interrupted while waiting {} to stop on
clearWALEntryBatch: {}",
+ this.source.getPeerId(), this.getName(), e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ LongAccumulator totalToDecrement = new LongAccumulator((a,b) -> a + b, 0);
+ entryReader.entryBatchQueue.forEach(w -> {
+ entryReader.entryBatchQueue.remove(w);
+ w.getWalEntries().forEach(e -> {
+ long entrySizeExcludeBulkLoad =
entryReader.getEntrySizeExcludeBulkLoad(e);
+ totalToDecrement.accumulate(entrySizeExcludeBulkLoad);
+ });
+ });
+
+
source.getSourceManager().getTotalBufferUsed().addAndGet(-totalToDecrement.longValue());
Review comment:
Thanks for updating the decrement logic.
Maybe a TRACE log message to indicate the amount of buffer reclaimed as a
part of shutting down. Sounds like that might be helpful in the future. e.g.
`LOG.trace("Decrementing totalBufferUsed by {}B while stopping Replication
WAL Readers", totalToDecrement.longValue())`
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
##########
@@ -626,14 +626,12 @@ public void terminate(String reason, Exception cause,
boolean clearMetrics, bool
Threads.shutdown(initThread, this.sleepForRetries);
}
Collection<ReplicationSourceShipper> workers = workerThreads.values();
+
for (ReplicationSourceShipper worker : workers) {
worker.stopWorker();
- if(worker.entryReader != null) {
+ if (worker.entryReader != null) {
worker.entryReader.setReaderRunning(false);
}
- }
-
- for (ReplicationSourceShipper worker : workers) {
Review comment:
Semantics of this have changed, but I'm not seeing conversation that
indicates that it was intentional.
Before: we would stop all workers, then wait for them all to be stopped.
Each worker could stop itself concurrently. Now, for each worker, we request a
stop and then wait for it to be stopped, then move on to the next worker.
I don't _think_ this is a big deal, but wanted to call it out.
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
##########
@@ -324,4 +326,51 @@ void stopWorker() {
public boolean isFinished() {
return state == WorkerState.FINISHED;
}
+
+ /**
+ * Attempts to properly update
<code>ReplicationSourceManager.totalBufferUser</code>,
+ * in case there were unprocessed entries batched by the reader to the
shipper,
+ * but the shipper didn't manage to ship those because the replication
source is being terminated.
+ * In that case, it iterates through the batched entries and decrease the
pending
+ * entries size from <code>ReplicationSourceManager.totalBufferUser</code>
+ * <p/>
+ * <b>NOTES</b>
+ * 1) This method should only be called upon replication source termination.
+ * It blocks waiting for both shipper and reader threads termination,
+ * to make sure no race conditions
+ * when updating <code>ReplicationSourceManager.totalBufferUser</code>.
+ *
+ * 2) It <b>does not</b> attempt to terminate reader and shipper threads.
Those <b>must</b>
+ * have been triggered interruption/termination prior to calling this method.
+ */
+ void clearWALEntryBatch() {
+ long timeout = System.currentTimeMillis() + this.shipEditsTimeout;
+ while(this.isAlive() || this.entryReader.isAlive()){
+ try {
+ if (System.currentTimeMillis() >= timeout) {
+ LOG.warn("Interrupting source thread for peer {} without cleaning
buffer usage "
+ + "because clearWALEntryBatch method timed out whilst waiting
reader/shipper "
+ + "thread to stop.", this.source.getPeerId());
+ Thread.currentThread().interrupt();
Review comment:
Could just `break` instead of interrupting this thread.
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
##########
@@ -324,4 +326,51 @@ void stopWorker() {
public boolean isFinished() {
return state == WorkerState.FINISHED;
}
+
+ /**
+ * Attempts to properly update
<code>ReplicationSourceManager.totalBufferUser</code>,
+ * in case there were unprocessed entries batched by the reader to the
shipper,
+ * but the shipper didn't manage to ship those because the replication
source is being terminated.
+ * In that case, it iterates through the batched entries and decrease the
pending
+ * entries size from <code>ReplicationSourceManager.totalBufferUser</code>
+ * <p/>
+ * <b>NOTES</b>
+ * 1) This method should only be called upon replication source termination.
+ * It blocks waiting for both shipper and reader threads termination,
+ * to make sure no race conditions
+ * when updating <code>ReplicationSourceManager.totalBufferUser</code>.
+ *
+ * 2) It <b>does not</b> attempt to terminate reader and shipper threads.
Those <b>must</b>
+ * have been triggered interruption/termination prior to calling this method.
+ */
+ void clearWALEntryBatch() {
+ long timeout = System.currentTimeMillis() + this.shipEditsTimeout;
+ while(this.isAlive() || this.entryReader.isAlive()){
+ try {
+ if (System.currentTimeMillis() >= timeout) {
+ LOG.warn("Interrupting source thread for peer {} without cleaning
buffer usage "
+ + "because clearWALEntryBatch method timed out whilst waiting
reader/shipper "
+ + "thread to stop.", this.source.getPeerId());
+ Thread.currentThread().interrupt();
+ } else {
+ // Wait both shipper and reader threads to stop
+ Thread.sleep(this.sleepForRetries);
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("{} Interrupted while waiting {} to stop on
clearWALEntryBatch: {}",
+ this.source.getPeerId(), this.getName(), e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ LongAccumulator totalToDecrement = new LongAccumulator((a,b) -> a + b, 0);
+ entryReader.entryBatchQueue.forEach(w -> {
+ entryReader.entryBatchQueue.remove(w);
+ w.getWalEntries().forEach(e -> {
+ long entrySizeExcludeBulkLoad =
entryReader.getEntrySizeExcludeBulkLoad(e);
+ totalToDecrement.accumulate(entrySizeExcludeBulkLoad);
+ });
+ });
+
+
source.getSourceManager().getTotalBufferUsed().addAndGet(-totalToDecrement.longValue());
Review comment:
Thanks for updating the decrement logic.
Maybe a TRACE log message to indicate the amount of buffer reclaimed as a
part of shutting down. Sounds like that might be helpful in the future. e.g.
`LOG.trace("Decrementing totalBufferUsed by {}B while stopping Replication
WAL Readers", totalToDecrement.longValue())`
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
##########
@@ -626,14 +626,12 @@ public void terminate(String reason, Exception cause,
boolean clearMetrics, bool
Threads.shutdown(initThread, this.sleepForRetries);
}
Collection<ReplicationSourceShipper> workers = workerThreads.values();
+
for (ReplicationSourceShipper worker : workers) {
worker.stopWorker();
- if(worker.entryReader != null) {
+ if (worker.entryReader != null) {
worker.entryReader.setReaderRunning(false);
}
- }
-
- for (ReplicationSourceShipper worker : workers) {
Review comment:
Semantics of this have changed, but I'm not seeing conversation that
indicates that it was intentional.
Before: we would stop all workers, then wait for them all to be stopped.
Each worker could stop itself concurrently. Now, for each worker, we request a
stop and then wait for it to be stopped, then move on to the next worker.
I don't _think_ this is a big deal, but wanted to call it out.
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
##########
@@ -324,4 +326,51 @@ void stopWorker() {
public boolean isFinished() {
return state == WorkerState.FINISHED;
}
+
+ /**
+ * Attempts to properly update
<code>ReplicationSourceManager.totalBufferUser</code>,
+ * in case there were unprocessed entries batched by the reader to the
shipper,
+ * but the shipper didn't manage to ship those because the replication
source is being terminated.
+ * In that case, it iterates through the batched entries and decrease the
pending
+ * entries size from <code>ReplicationSourceManager.totalBufferUser</code>
+ * <p/>
+ * <b>NOTES</b>
+ * 1) This method should only be called upon replication source termination.
+ * It blocks waiting for both shipper and reader threads termination,
+ * to make sure no race conditions
+ * when updating <code>ReplicationSourceManager.totalBufferUser</code>.
+ *
+ * 2) It <b>does not</b> attempt to terminate reader and shipper threads.
Those <b>must</b>
+ * have been triggered interruption/termination prior to calling this method.
+ */
+ void clearWALEntryBatch() {
+ long timeout = System.currentTimeMillis() + this.shipEditsTimeout;
+ while(this.isAlive() || this.entryReader.isAlive()){
+ try {
+ if (System.currentTimeMillis() >= timeout) {
+ LOG.warn("Interrupting source thread for peer {} without cleaning
buffer usage "
+ + "because clearWALEntryBatch method timed out whilst waiting
reader/shipper "
+ + "thread to stop.", this.source.getPeerId());
+ Thread.currentThread().interrupt();
Review comment:
Could just `break` instead of interrupting this thread.
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
##########
@@ -324,4 +326,51 @@ void stopWorker() {
public boolean isFinished() {
return state == WorkerState.FINISHED;
}
+
+ /**
+ * Attempts to properly update
<code>ReplicationSourceManager.totalBufferUser</code>,
+ * in case there were unprocessed entries batched by the reader to the
shipper,
+ * but the shipper didn't manage to ship those because the replication
source is being terminated.
+ * In that case, it iterates through the batched entries and decrease the
pending
+ * entries size from <code>ReplicationSourceManager.totalBufferUser</code>
+ * <p/>
+ * <b>NOTES</b>
+ * 1) This method should only be called upon replication source termination.
+ * It blocks waiting for both shipper and reader threads termination,
+ * to make sure no race conditions
+ * when updating <code>ReplicationSourceManager.totalBufferUser</code>.
+ *
+ * 2) It <b>does not</b> attempt to terminate reader and shipper threads.
Those <b>must</b>
+ * have been triggered interruption/termination prior to calling this method.
+ */
+ void clearWALEntryBatch() {
+ long timeout = System.currentTimeMillis() + this.shipEditsTimeout;
+ while(this.isAlive() || this.entryReader.isAlive()){
+ try {
+ if (System.currentTimeMillis() >= timeout) {
+ LOG.warn("Interrupting source thread for peer {} without cleaning
buffer usage "
+ + "because clearWALEntryBatch method timed out whilst waiting
reader/shipper "
+ + "thread to stop.", this.source.getPeerId());
+ Thread.currentThread().interrupt();
+ } else {
+ // Wait both shipper and reader threads to stop
+ Thread.sleep(this.sleepForRetries);
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("{} Interrupted while waiting {} to stop on
clearWALEntryBatch: {}",
+ this.source.getPeerId(), this.getName(), e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ LongAccumulator totalToDecrement = new LongAccumulator((a,b) -> a + b, 0);
+ entryReader.entryBatchQueue.forEach(w -> {
+ entryReader.entryBatchQueue.remove(w);
+ w.getWalEntries().forEach(e -> {
+ long entrySizeExcludeBulkLoad =
entryReader.getEntrySizeExcludeBulkLoad(e);
+ totalToDecrement.accumulate(entrySizeExcludeBulkLoad);
+ });
+ });
+
+
source.getSourceManager().getTotalBufferUsed().addAndGet(-totalToDecrement.longValue());
Review comment:
Thanks for updating the decrement logic.
Maybe a TRACE log message to indicate the amount of buffer reclaimed as a
part of shutting down. Sounds like that might be helpful in the future. e.g.
`LOG.trace("Decrementing totalBufferUsed by {}B while stopping Replication
WAL Readers", totalToDecrement.longValue())`
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
##########
@@ -626,14 +626,12 @@ public void terminate(String reason, Exception cause,
boolean clearMetrics, bool
Threads.shutdown(initThread, this.sleepForRetries);
}
Collection<ReplicationSourceShipper> workers = workerThreads.values();
+
for (ReplicationSourceShipper worker : workers) {
worker.stopWorker();
- if(worker.entryReader != null) {
+ if (worker.entryReader != null) {
worker.entryReader.setReaderRunning(false);
}
- }
-
- for (ReplicationSourceShipper worker : workers) {
Review comment:
Semantics of this have changed, but I'm not seeing conversation that
indicates that it was intentional.
Before: we would stop all workers, then wait for them all to be stopped.
Each worker could stop itself concurrently. Now, for each worker, we request a
stop and then wait for it to be stopped, then move on to the next worker.
I don't _think_ this is a big deal, but wanted to call it out.
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
##########
@@ -324,4 +326,51 @@ void stopWorker() {
public boolean isFinished() {
return state == WorkerState.FINISHED;
}
+
+ /**
+ * Attempts to properly update
<code>ReplicationSourceManager.totalBufferUser</code>,
+ * in case there were unprocessed entries batched by the reader to the
shipper,
+ * but the shipper didn't manage to ship those because the replication
source is being terminated.
+ * In that case, it iterates through the batched entries and decrease the
pending
+ * entries size from <code>ReplicationSourceManager.totalBufferUser</code>
+ * <p/>
+ * <b>NOTES</b>
+ * 1) This method should only be called upon replication source termination.
+ * It blocks waiting for both shipper and reader threads termination,
+ * to make sure no race conditions
+ * when updating <code>ReplicationSourceManager.totalBufferUser</code>.
+ *
+ * 2) It <b>does not</b> attempt to terminate reader and shipper threads.
Those <b>must</b>
+ * have been triggered interruption/termination prior to calling this method.
+ */
+ void clearWALEntryBatch() {
+ long timeout = System.currentTimeMillis() + this.shipEditsTimeout;
+ while(this.isAlive() || this.entryReader.isAlive()){
+ try {
+ if (System.currentTimeMillis() >= timeout) {
+ LOG.warn("Interrupting source thread for peer {} without cleaning
buffer usage "
+ + "because clearWALEntryBatch method timed out whilst waiting
reader/shipper "
+ + "thread to stop.", this.source.getPeerId());
+ Thread.currentThread().interrupt();
Review comment:
Could just `break` instead of interrupting this thread.
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
##########
@@ -324,4 +326,51 @@ void stopWorker() {
public boolean isFinished() {
return state == WorkerState.FINISHED;
}
+
+ /**
+ * Attempts to properly update
<code>ReplicationSourceManager.totalBufferUser</code>,
+ * in case there were unprocessed entries batched by the reader to the
shipper,
+ * but the shipper didn't manage to ship those because the replication
source is being terminated.
+ * In that case, it iterates through the batched entries and decrease the
pending
+ * entries size from <code>ReplicationSourceManager.totalBufferUser</code>
+ * <p/>
+ * <b>NOTES</b>
+ * 1) This method should only be called upon replication source termination.
+ * It blocks waiting for both shipper and reader threads termination,
+ * to make sure no race conditions
+ * when updating <code>ReplicationSourceManager.totalBufferUser</code>.
+ *
+ * 2) It <b>does not</b> attempt to terminate reader and shipper threads.
Those <b>must</b>
+ * have been triggered interruption/termination prior to calling this method.
+ */
+ void clearWALEntryBatch() {
+ long timeout = System.currentTimeMillis() + this.shipEditsTimeout;
+ while(this.isAlive() || this.entryReader.isAlive()){
+ try {
+ if (System.currentTimeMillis() >= timeout) {
+ LOG.warn("Interrupting source thread for peer {} without cleaning
buffer usage "
+ + "because clearWALEntryBatch method timed out whilst waiting
reader/shipper "
+ + "thread to stop.", this.source.getPeerId());
+ Thread.currentThread().interrupt();
+ } else {
+ // Wait both shipper and reader threads to stop
+ Thread.sleep(this.sleepForRetries);
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("{} Interrupted while waiting {} to stop on
clearWALEntryBatch: {}",
+ this.source.getPeerId(), this.getName(), e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ LongAccumulator totalToDecrement = new LongAccumulator((a,b) -> a + b, 0);
+ entryReader.entryBatchQueue.forEach(w -> {
+ entryReader.entryBatchQueue.remove(w);
+ w.getWalEntries().forEach(e -> {
+ long entrySizeExcludeBulkLoad =
entryReader.getEntrySizeExcludeBulkLoad(e);
+ totalToDecrement.accumulate(entrySizeExcludeBulkLoad);
+ });
+ });
+
+
source.getSourceManager().getTotalBufferUsed().addAndGet(-totalToDecrement.longValue());
Review comment:
Thanks for updating the decrement logic.
Maybe a TRACE log message to indicate the amount of buffer reclaimed as a
part of shutting down. Sounds like that might be helpful in the future. e.g.
`LOG.trace("Decrementing totalBufferUsed by {}B while stopping Replication
WAL Readers", totalToDecrement.longValue())`
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
##########
@@ -626,14 +626,12 @@ public void terminate(String reason, Exception cause,
boolean clearMetrics, bool
Threads.shutdown(initThread, this.sleepForRetries);
}
Collection<ReplicationSourceShipper> workers = workerThreads.values();
+
for (ReplicationSourceShipper worker : workers) {
worker.stopWorker();
- if(worker.entryReader != null) {
+ if (worker.entryReader != null) {
worker.entryReader.setReaderRunning(false);
}
- }
-
- for (ReplicationSourceShipper worker : workers) {
Review comment:
Semantics of this have changed, but I'm not seeing conversation that
indicates that it was intentional.
Before: we would stop all workers, then wait for them all to be stopped.
Each worker could stop itself concurrently. Now, for each worker, we request a
stop and then wait for it to be stopped, then move on to the next worker.
I don't _think_ this is a big deal, but wanted to call it out.
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
##########
@@ -324,4 +326,51 @@ void stopWorker() {
public boolean isFinished() {
return state == WorkerState.FINISHED;
}
+
+ /**
+ * Attempts to properly update
<code>ReplicationSourceManager.totalBufferUser</code>,
+ * in case there were unprocessed entries batched by the reader to the
shipper,
+ * but the shipper didn't manage to ship those because the replication
source is being terminated.
+ * In that case, it iterates through the batched entries and decrease the
pending
+ * entries size from <code>ReplicationSourceManager.totalBufferUser</code>
+ * <p/>
+ * <b>NOTES</b>
+ * 1) This method should only be called upon replication source termination.
+ * It blocks waiting for both shipper and reader threads termination,
+ * to make sure no race conditions
+ * when updating <code>ReplicationSourceManager.totalBufferUser</code>.
+ *
+ * 2) It <b>does not</b> attempt to terminate reader and shipper threads.
Those <b>must</b>
+ * have been triggered interruption/termination prior to calling this method.
+ */
+ void clearWALEntryBatch() {
+ long timeout = System.currentTimeMillis() + this.shipEditsTimeout;
+ while(this.isAlive() || this.entryReader.isAlive()){
+ try {
+ if (System.currentTimeMillis() >= timeout) {
+ LOG.warn("Interrupting source thread for peer {} without cleaning
buffer usage "
+ + "because clearWALEntryBatch method timed out whilst waiting
reader/shipper "
+ + "thread to stop.", this.source.getPeerId());
+ Thread.currentThread().interrupt();
Review comment:
Could just `break` instead of interrupting this thread.
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
##########
@@ -324,4 +326,51 @@ void stopWorker() {
public boolean isFinished() {
return state == WorkerState.FINISHED;
}
+
+ /**
+ * Attempts to properly update
<code>ReplicationSourceManager.totalBufferUser</code>,
+ * in case there were unprocessed entries batched by the reader to the
shipper,
+ * but the shipper didn't manage to ship those because the replication
source is being terminated.
+ * In that case, it iterates through the batched entries and decrease the
pending
+ * entries size from <code>ReplicationSourceManager.totalBufferUser</code>
+ * <p/>
+ * <b>NOTES</b>
+ * 1) This method should only be called upon replication source termination.
+ * It blocks waiting for both shipper and reader threads termination,
+ * to make sure no race conditions
+ * when updating <code>ReplicationSourceManager.totalBufferUser</code>.
+ *
+ * 2) It <b>does not</b> attempt to terminate reader and shipper threads.
Those <b>must</b>
+ * have been triggered interruption/termination prior to calling this method.
+ */
+ void clearWALEntryBatch() {
+ long timeout = System.currentTimeMillis() + this.shipEditsTimeout;
+ while(this.isAlive() || this.entryReader.isAlive()){
+ try {
+ if (System.currentTimeMillis() >= timeout) {
+ LOG.warn("Interrupting source thread for peer {} without cleaning
buffer usage "
+ + "because clearWALEntryBatch method timed out whilst waiting
reader/shipper "
+ + "thread to stop.", this.source.getPeerId());
+ Thread.currentThread().interrupt();
+ } else {
+ // Wait both shipper and reader threads to stop
+ Thread.sleep(this.sleepForRetries);
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("{} Interrupted while waiting {} to stop on
clearWALEntryBatch: {}",
+ this.source.getPeerId(), this.getName(), e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ LongAccumulator totalToDecrement = new LongAccumulator((a,b) -> a + b, 0);
+ entryReader.entryBatchQueue.forEach(w -> {
+ entryReader.entryBatchQueue.remove(w);
+ w.getWalEntries().forEach(e -> {
+ long entrySizeExcludeBulkLoad =
entryReader.getEntrySizeExcludeBulkLoad(e);
+ totalToDecrement.accumulate(entrySizeExcludeBulkLoad);
+ });
+ });
+
+
source.getSourceManager().getTotalBufferUsed().addAndGet(-totalToDecrement.longValue());
Review comment:
Thanks for updating the decrement logic.
Maybe a TRACE log message to indicate the amount of buffer reclaimed as a
part of shutting down. Sounds like that might be helpful in the future. e.g.
`LOG.trace("Decrementing totalBufferUsed by {}B while stopping Replication
WAL Readers", totalToDecrement.longValue())`
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
##########
@@ -626,14 +626,12 @@ public void terminate(String reason, Exception cause,
boolean clearMetrics, bool
Threads.shutdown(initThread, this.sleepForRetries);
}
Collection<ReplicationSourceShipper> workers = workerThreads.values();
+
for (ReplicationSourceShipper worker : workers) {
worker.stopWorker();
- if(worker.entryReader != null) {
+ if (worker.entryReader != null) {
worker.entryReader.setReaderRunning(false);
}
- }
-
- for (ReplicationSourceShipper worker : workers) {
Review comment:
Semantics of this have changed, but I'm not seeing conversation that
indicates that it was intentional.
Before: we would stop all workers, then wait for them all to be stopped.
Each worker could stop itself concurrently. Now, for each worker, we request a
stop and then wait for it to be stopped, then move on to the next worker.
I don't _think_ this is a big deal, but wanted to call it out.
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
##########
@@ -324,4 +326,51 @@ void stopWorker() {
public boolean isFinished() {
return state == WorkerState.FINISHED;
}
+
+ /**
+ * Attempts to properly update
<code>ReplicationSourceManager.totalBufferUser</code>,
+ * in case there were unprocessed entries batched by the reader to the
shipper,
+ * but the shipper didn't manage to ship those because the replication
source is being terminated.
+ * In that case, it iterates through the batched entries and decrease the
pending
+ * entries size from <code>ReplicationSourceManager.totalBufferUser</code>
+ * <p/>
+ * <b>NOTES</b>
+ * 1) This method should only be called upon replication source termination.
+ * It blocks waiting for both shipper and reader threads termination,
+ * to make sure no race conditions
+ * when updating <code>ReplicationSourceManager.totalBufferUser</code>.
+ *
+ * 2) It <b>does not</b> attempt to terminate reader and shipper threads.
Those <b>must</b>
+ * have been triggered interruption/termination prior to calling this method.
+ */
+ void clearWALEntryBatch() {
+ long timeout = System.currentTimeMillis() + this.shipEditsTimeout;
+ while(this.isAlive() || this.entryReader.isAlive()){
+ try {
+ if (System.currentTimeMillis() >= timeout) {
+ LOG.warn("Interrupting source thread for peer {} without cleaning
buffer usage "
+ + "because clearWALEntryBatch method timed out whilst waiting
reader/shipper "
+ + "thread to stop.", this.source.getPeerId());
+ Thread.currentThread().interrupt();
Review comment:
Could just `break` instead of interrupting this thread.
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
##########
@@ -324,4 +326,51 @@ void stopWorker() {
public boolean isFinished() {
return state == WorkerState.FINISHED;
}
+
+ /**
+ * Attempts to properly update
<code>ReplicationSourceManager.totalBufferUser</code>,
+ * in case there were unprocessed entries batched by the reader to the
shipper,
+ * but the shipper didn't manage to ship those because the replication
source is being terminated.
+ * In that case, it iterates through the batched entries and decrease the
pending
+ * entries size from <code>ReplicationSourceManager.totalBufferUser</code>
+ * <p/>
+ * <b>NOTES</b>
+ * 1) This method should only be called upon replication source termination.
+ * It blocks waiting for both shipper and reader threads termination,
+ * to make sure no race conditions
+ * when updating <code>ReplicationSourceManager.totalBufferUser</code>.
+ *
+ * 2) It <b>does not</b> attempt to terminate reader and shipper threads.
Those <b>must</b>
+ * have been triggered interruption/termination prior to calling this method.
+ */
+ void clearWALEntryBatch() {
+ long timeout = System.currentTimeMillis() + this.shipEditsTimeout;
+ while(this.isAlive() || this.entryReader.isAlive()){
+ try {
+ if (System.currentTimeMillis() >= timeout) {
+ LOG.warn("Interrupting source thread for peer {} without cleaning
buffer usage "
+ + "because clearWALEntryBatch method timed out whilst waiting
reader/shipper "
+ + "thread to stop.", this.source.getPeerId());
+ Thread.currentThread().interrupt();
+ } else {
+ // Wait both shipper and reader threads to stop
+ Thread.sleep(this.sleepForRetries);
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("{} Interrupted while waiting {} to stop on
clearWALEntryBatch: {}",
+ this.source.getPeerId(), this.getName(), e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ LongAccumulator totalToDecrement = new LongAccumulator((a,b) -> a + b, 0);
+ entryReader.entryBatchQueue.forEach(w -> {
+ entryReader.entryBatchQueue.remove(w);
+ w.getWalEntries().forEach(e -> {
+ long entrySizeExcludeBulkLoad =
entryReader.getEntrySizeExcludeBulkLoad(e);
+ totalToDecrement.accumulate(entrySizeExcludeBulkLoad);
+ });
+ });
+
+
source.getSourceManager().getTotalBufferUsed().addAndGet(-totalToDecrement.longValue());
Review comment:
Thanks for updating the decrement logic.
Maybe a TRACE log message to indicate the amount of buffer reclaimed as a
part of shutting down. Sounds like that might be helpful in the future. e.g.
`LOG.trace("Decrementing totalBufferUsed by {}B while stopping Replication
WAL Readers", totalToDecrement.longValue())`
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
##########
@@ -626,14 +626,12 @@ public void terminate(String reason, Exception cause,
boolean clearMetrics, bool
Threads.shutdown(initThread, this.sleepForRetries);
}
Collection<ReplicationSourceShipper> workers = workerThreads.values();
+
for (ReplicationSourceShipper worker : workers) {
worker.stopWorker();
- if(worker.entryReader != null) {
+ if (worker.entryReader != null) {
worker.entryReader.setReaderRunning(false);
}
- }
-
- for (ReplicationSourceShipper worker : workers) {
Review comment:
Semantics of this have changed, but I'm not seeing conversation that
indicates that it was intentional.
Before: we would stop all workers, then wait for them all to be stopped.
Each worker could stop itself concurrently. Now, for each worker, we request a
stop and then wait for it to be stopped, then move on to the next worker.
I don't _think_ this is a big deal, but wanted to call it out.
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
##########
@@ -324,4 +326,51 @@ void stopWorker() {
public boolean isFinished() {
return state == WorkerState.FINISHED;
}
+
+ /**
+ * Attempts to properly update
<code>ReplicationSourceManager.totalBufferUser</code>,
+ * in case there were unprocessed entries batched by the reader to the
shipper,
+ * but the shipper didn't manage to ship those because the replication
source is being terminated.
+ * In that case, it iterates through the batched entries and decrease the
pending
+ * entries size from <code>ReplicationSourceManager.totalBufferUser</code>
+ * <p/>
+ * <b>NOTES</b>
+ * 1) This method should only be called upon replication source termination.
+ * It blocks waiting for both shipper and reader threads termination,
+ * to make sure no race conditions
+ * when updating <code>ReplicationSourceManager.totalBufferUser</code>.
+ *
+ * 2) It <b>does not</b> attempt to terminate reader and shipper threads.
Those <b>must</b>
+ * have been triggered interruption/termination prior to calling this method.
+ */
+ void clearWALEntryBatch() {
+ long timeout = System.currentTimeMillis() + this.shipEditsTimeout;
+ while(this.isAlive() || this.entryReader.isAlive()){
+ try {
+ if (System.currentTimeMillis() >= timeout) {
+ LOG.warn("Interrupting source thread for peer {} without cleaning
buffer usage "
+ + "because clearWALEntryBatch method timed out whilst waiting
reader/shipper "
+ + "thread to stop.", this.source.getPeerId());
+ Thread.currentThread().interrupt();
Review comment:
Could just `break` instead of interrupting this thread.
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
##########
@@ -324,4 +326,51 @@ void stopWorker() {
public boolean isFinished() {
return state == WorkerState.FINISHED;
}
+
+ /**
+ * Attempts to properly update
<code>ReplicationSourceManager.totalBufferUser</code>,
+ * in case there were unprocessed entries batched by the reader to the
shipper,
+ * but the shipper didn't manage to ship those because the replication
source is being terminated.
+ * In that case, it iterates through the batched entries and decrease the
pending
+ * entries size from <code>ReplicationSourceManager.totalBufferUser</code>
+ * <p/>
+ * <b>NOTES</b>
+ * 1) This method should only be called upon replication source termination.
+ * It blocks waiting for both shipper and reader threads termination,
+ * to make sure no race conditions
+ * when updating <code>ReplicationSourceManager.totalBufferUser</code>.
+ *
+ * 2) It <b>does not</b> attempt to terminate reader and shipper threads.
Those <b>must</b>
+ * have been triggered interruption/termination prior to calling this method.
+ */
+ void clearWALEntryBatch() {
+ long timeout = System.currentTimeMillis() + this.shipEditsTimeout;
+ while(this.isAlive() || this.entryReader.isAlive()){
+ try {
+ if (System.currentTimeMillis() >= timeout) {
+ LOG.warn("Interrupting source thread for peer {} without cleaning
buffer usage "
+ + "because clearWALEntryBatch method timed out whilst waiting
reader/shipper "
+ + "thread to stop.", this.source.getPeerId());
+ Thread.currentThread().interrupt();
+ } else {
+ // Wait both shipper and reader threads to stop
+ Thread.sleep(this.sleepForRetries);
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("{} Interrupted while waiting {} to stop on
clearWALEntryBatch: {}",
+ this.source.getPeerId(), this.getName(), e);
+ Thread.currentThread().interrupt();
+ }
+ }
+ LongAccumulator totalToDecrement = new LongAccumulator((a,b) -> a + b, 0);
+ entryReader.entryBatchQueue.forEach(w -> {
+ entryReader.entryBatchQueue.remove(w);
+ w.getWalEntries().forEach(e -> {
+ long entrySizeExcludeBulkLoad =
entryReader.getEntrySizeExcludeBulkLoad(e);
+ totalToDecrement.accumulate(entrySizeExcludeBulkLoad);
+ });
+ });
+
+
source.getSourceManager().getTotalBufferUsed().addAndGet(-totalToDecrement.longValue());
Review comment:
Thanks for updating the decrement logic.
Maybe a TRACE log message to indicate the amount of buffer reclaimed as a
part of shutting down. Sounds like that might be helpful in the future. e.g.
`LOG.trace("Decrementing totalBufferUsed by {}B while stopping Replication
WAL Readers", totalToDecrement.longValue())`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]