Apache9 commented on a change in pull request #2255:
URL: https://github.com/apache/hbase/pull/2255#discussion_r472636537
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
##########
@@ -290,7 +290,22 @@ private boolean updateLogPosition(WALEntryBatch batch) {
public void startup(UncaughtExceptionHandler handler) {
String name = Thread.currentThread().getName();
Threads.setDaemonThreadRunning(this,
- name + ".replicationSource.shipper" + walGroupId + "," +
source.getQueueId(), handler);
+ name + ".replicationSource.shipper" + walGroupId + "," +
source.getQueueId(),
+ (t,e) -> {
Review comment:
OK, the code is almost the same... Then I think we could move the logic
into uncaughtException method? If abortOnError is true, we about, otherwise we
will try to refresh the source.
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
##########
@@ -373,7 +389,21 @@ private void tryStartNewShipper(String walGroupId,
PriorityBlockingQueue<Path> q
Threads.setDaemonThreadRunning(
walReader, Thread.currentThread().getName()
+ ".replicationSource.wal-reader." + walGroupId + "," +
queueId,
- this::uncaughtException);
+ (t,e) -> {
Review comment:
So here it is for wal reader. I think refreshSources and retry is an
acceptable way. Then let's just test the abortOnError flag here? If it is true,
we will call uncaughtException, otherwise we will try to refresh the
replication source.
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
##########
@@ -583,16 +617,27 @@ private void initialize() {
PriorityBlockingQueue<Path> queue = entry.getValue();
tryStartNewShipper(walGroupId, queue);
}
+ this.startupOngoing.set(false);
}
@Override
public void startup() {
// mark we are running now
this.sourceRunning = true;
- initThread = new Thread(this::initialize);
- Threads.setDaemonThreadRunning(initThread,
- Thread.currentThread().getName() + ".replicationSource," + this.queueId,
- this::uncaughtException);
+ this.retryStartup.set(true);
Review comment:
This flag is only used in this method? Let's use a local var instead of
a class member field?
##########
File path:
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
##########
@@ -583,16 +617,27 @@ private void initialize() {
PriorityBlockingQueue<Path> queue = entry.getValue();
tryStartNewShipper(walGroupId, queue);
}
+ this.startupOngoing.set(false);
}
@Override
public void startup() {
// mark we are running now
this.sourceRunning = true;
- initThread = new Thread(this::initialize);
- Threads.setDaemonThreadRunning(initThread,
- Thread.currentThread().getName() + ".replicationSource," + this.queueId,
- this::uncaughtException);
+ this.retryStartup.set(true);
+ do {
+ if(retryStartup.get()) {
+ retryStartup.set(false);
+ startupOngoing.set(true);
Review comment:
So this one is exactly the same with source.isActive? Can we just make
use of that flag instead of introducing a new one?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]