This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new b859649e6c3 HBASE-28129 Do not retry refreshSources when region server
is already stopping (#5453)
b859649e6c3 is described below
commit b859649e6c36c5d9558ef7c54e928fb8241bb485
Author: Duo Zhang <[email protected]>
AuthorDate: Sat Oct 7 11:35:37 2023 +0800
HBASE-28129 Do not retry refreshSources when region server is already
stopping (#5453)
Signed-off-by: GeorryHuang <[email protected]>
Signed-off-by: Xiaolin Ha <[email protected]>
(cherry picked from commit 6455c49239a4eeb966a4f4d9afbffc9610e6d394)
---
.../regionserver/ReplicationSource.java | 50 ++++++++++++----------
1 file changed, 27 insertions(+), 23 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 3598f06ecdc..aebdccdc92d 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -344,12 +344,10 @@ public class ReplicationSource implements
ReplicationSourceInterface {
ReplicationSourceShipper worker = createNewShipper(walGroupId);
ReplicationSourceWALReader walReader =
createNewWALReader(walGroupId, worker.getStartPosition());
- Threads.setDaemonThreadRunning(
- walReader, Thread.currentThread().getName() +
".replicationSource.wal-reader."
- + walGroupId + "," + queueId,
- (t, e) -> this.uncaughtException(t, e, this.manager,
this.getPeerId()));
+ Threads.setDaemonThreadRunning(walReader,
Thread.currentThread().getName()
+ + ".replicationSource.wal-reader." + walGroupId + "," + queueId,
this::retryRefreshing);
worker.setWALReader(walReader);
- worker.startup((t, e) -> this.uncaughtException(t, e, this.manager,
this.getPeerId()));
+ worker.startup(this::retryRefreshing);
return worker;
}
});
@@ -422,24 +420,30 @@ public class ReplicationSource implements
ReplicationSourceInterface {
return walEntryFilter;
}
- private void uncaughtException(Thread t, Throwable e,
ReplicationSourceManager manager,
- String peerId) {
- RSRpcServices.exitIfOOME(e);
- LOG.error("Unexpected exception in {} currentPath={}", t.getName(),
getCurrentPath(), e);
+ // log the error, check if the error is OOME, or whether we should abort the
server
+ private void checkError(Thread t, Throwable error) {
+ RSRpcServices.exitIfOOME(error);
+ LOG.error("Unexpected exception in {} currentPath={}", t.getName(),
getCurrentPath(), error);
if (abortOnError) {
- server.abort("Unexpected exception in " + t.getName(), e);
+ server.abort("Unexpected exception in " + t.getName(), error);
}
- if (manager != null) {
- while (true) {
- try {
- LOG.info("Refreshing replication sources now due to previous error
on thread: {}",
- t.getName());
- manager.refreshSources(peerId);
- break;
- } catch (IOException e1) {
- LOG.error("Replication sources refresh failed.", e1);
- sleepForRetries("Sleeping before try refreshing sources again",
maxRetriesMultiplier);
- }
+ }
+
+ private void retryRefreshing(Thread t, Throwable error) {
+ checkError(t, error);
+ while (true) {
+ if (server.isAborted() || server.isStopped() || server.isStopping()) {
+ LOG.warn("Server is shutting down, give up refreshing source for peer
{}", getPeerId());
+ return;
+ }
+ try {
+ LOG.info("Refreshing replication sources now due to previous error on
thread: {}",
+ t.getName());
+ manager.refreshSources(getPeerId());
+ break;
+ } catch (Exception e) {
+ LOG.error("Replication sources refresh failed.", e);
+ sleepForRetries("Sleeping before try refreshing sources again",
maxRetriesMultiplier);
}
}
}
@@ -613,7 +617,7 @@ public class ReplicationSource implements
ReplicationSourceInterface {
// keep looping in this thread until initialize eventually succeeds,
// while the server main startup one can go on with its work.
sourceRunning = false;
- uncaughtException(t, e, null, null);
+ checkError(t, e);
retryStartup.set(!this.abortOnError);
do {
if (retryStartup.get()) {
@@ -624,7 +628,7 @@ public class ReplicationSource implements
ReplicationSourceInterface {
initialize();
} catch (Throwable error) {
setSourceStartupStatus(false);
- uncaughtException(t, error, null, null);
+ checkError(t, error);
retryStartup.set(!this.abortOnError);
}
}