virajjasani commented on a change in pull request #2255:
URL: https://github.com/apache/hbase/pull/2255#discussion_r469904081
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
##########
@@ -583,16 +614,27 @@ private void initialize() {
PriorityBlockingQueue<Path> queue = entry.getValue();
tryStartNewShipper(walGroupId, queue);
}
+ this.startupOngoing.set(false);
}
@Override
public void startup() {
// mark we are running now
this.sourceRunning = true;
- initThread = new Thread(this::initialize);
- Threads.setDaemonThreadRunning(initThread,
- Thread.currentThread().getName() + ".replicationSource," + this.queueId,
- this::uncaughtException);
+ this.retryStartup.set(true);
+ do {
+ if(retryStartup.get()) {
+ retryStartup.set(false);
+ startupOngoing.set(true);
+ initThread = new Thread(this::initialize);
+ Threads.setDaemonThreadRunning(initThread,
+ Thread.currentThread().getName() + ".replicationSource," +
this.queueId,
+ (t,e) -> {
+ uncaughtException(t, e);
+ retryStartup.set(true);
Review comment:
I hope if we encounter uncaughtException here, we want to retry the loop
again.
If so, shall we also add `startupOngoing.set(true);` here explicitely? Just
in case if it is set to false in `initialize()`?
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
##########
@@ -35,6 +35,8 @@
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
Review comment:
nit: redundant
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
##########
@@ -120,6 +122,13 @@
// ReplicationEndpoint which will handle the actual replication
private volatile ReplicationEndpoint replicationEndpoint;
+ private AtomicBoolean retryStartup = new AtomicBoolean(false);
+
+ private AtomicBoolean startupOngoing = new AtomicBoolean(false);
Review comment:
nit: `final` for both?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]