anmolnar commented on code in PR #7617:
URL: https://github.com/apache/hbase/pull/7617#discussion_r3016292503
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java:
##########
@@ -866,4 +866,24 @@ public long getTotalReplicatedEdits() {
long getSleepForRetries() {
return sleepForRetries;
}
+
+ void restartShipper(String walGroupId, ReplicationSourceShipper oldWorker) {
+ workerThreads.compute(walGroupId, (key, current) -> {
+ if (current != oldWorker) {
+ return current; // already replaced
Review Comment:
How is this possible?
##########
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java:
##########
@@ -866,4 +866,24 @@ public long getTotalReplicatedEdits() {
long getSleepForRetries() {
return sleepForRetries;
}
+
+ void restartShipper(String walGroupId, ReplicationSourceShipper oldWorker) {
+ workerThreads.compute(walGroupId, (key, current) -> {
+ if (current != oldWorker) {
+ return current; // already replaced
+ }
+
+ LOG.warn("Restarting shipper for walGroupId={}", walGroupId);
+
+ try {
+ ReplicationSourceShipper newWorker = createNewShipper(walGroupId);
Review Comment:
You might want to reuse `tryStartNewShipper(walGroupId)` here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]