This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new b859649e6c3 HBASE-28129 Do not retry refreshSources when region server 
is already stopping (#5453)
b859649e6c3 is described below

commit b859649e6c36c5d9558ef7c54e928fb8241bb485
Author: Duo Zhang <[email protected]>
AuthorDate: Sat Oct 7 11:35:37 2023 +0800

    HBASE-28129 Do not retry refreshSources when region server is already 
stopping (#5453)
    
    Signed-off-by: GeorryHuang <[email protected]>
    Signed-off-by: Xiaolin Ha <[email protected]>
    (cherry picked from commit 6455c49239a4eeb966a4f4d9afbffc9610e6d394)
---
 .../regionserver/ReplicationSource.java            | 50 ++++++++++++----------
 1 file changed, 27 insertions(+), 23 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 3598f06ecdc..aebdccdc92d 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -344,12 +344,10 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
         ReplicationSourceShipper worker = createNewShipper(walGroupId);
         ReplicationSourceWALReader walReader =
           createNewWALReader(walGroupId, worker.getStartPosition());
-        Threads.setDaemonThreadRunning(
-          walReader, Thread.currentThread().getName() + 
".replicationSource.wal-reader."
-            + walGroupId + "," + queueId,
-          (t, e) -> this.uncaughtException(t, e, this.manager, 
this.getPeerId()));
+        Threads.setDaemonThreadRunning(walReader, 
Thread.currentThread().getName()
+          + ".replicationSource.wal-reader." + walGroupId + "," + queueId, 
this::retryRefreshing);
         worker.setWALReader(walReader);
-        worker.startup((t, e) -> this.uncaughtException(t, e, this.manager, 
this.getPeerId()));
+        worker.startup(this::retryRefreshing);
         return worker;
       }
     });
@@ -422,24 +420,30 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
     return walEntryFilter;
   }
 
-  private void uncaughtException(Thread t, Throwable e, 
ReplicationSourceManager manager,
-    String peerId) {
-    RSRpcServices.exitIfOOME(e);
-    LOG.error("Unexpected exception in {} currentPath={}", t.getName(), 
getCurrentPath(), e);
+  // log the error, check if the error is OOME, or whether we should abort the 
server
+  private void checkError(Thread t, Throwable error) {
+    RSRpcServices.exitIfOOME(error);
+    LOG.error("Unexpected exception in {} currentPath={}", t.getName(), 
getCurrentPath(), error);
     if (abortOnError) {
-      server.abort("Unexpected exception in " + t.getName(), e);
+      server.abort("Unexpected exception in " + t.getName(), error);
     }
-    if (manager != null) {
-      while (true) {
-        try {
-          LOG.info("Refreshing replication sources now due to previous error 
on thread: {}",
-            t.getName());
-          manager.refreshSources(peerId);
-          break;
-        } catch (IOException e1) {
-          LOG.error("Replication sources refresh failed.", e1);
-          sleepForRetries("Sleeping before try refreshing sources again", 
maxRetriesMultiplier);
-        }
+  }
+
+  private void retryRefreshing(Thread t, Throwable error) {
+    checkError(t, error);
+    while (true) {
+      if (server.isAborted() || server.isStopped() || server.isStopping()) {
+        LOG.warn("Server is shutting down, give up refreshing source for peer 
{}", getPeerId());
+        return;
+      }
+      try {
+        LOG.info("Refreshing replication sources now due to previous error on 
thread: {}",
+          t.getName());
+        manager.refreshSources(getPeerId());
+        break;
+      } catch (Exception e) {
+        LOG.error("Replication sources refresh failed.", e);
+        sleepForRetries("Sleeping before try refreshing sources again", 
maxRetriesMultiplier);
       }
     }
   }
@@ -613,7 +617,7 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
         // keep looping in this thread until initialize eventually succeeds,
         // while the server main startup one can go on with its work.
         sourceRunning = false;
-        uncaughtException(t, e, null, null);
+        checkError(t, e);
         retryStartup.set(!this.abortOnError);
         do {
           if (retryStartup.get()) {
@@ -624,7 +628,7 @@ public class ReplicationSource implements 
ReplicationSourceInterface {
               initialize();
             } catch (Throwable error) {
               setSourceStartupStatus(false);
-              uncaughtException(t, error, null, null);
+              checkError(t, error);
               retryStartup.set(!this.abortOnError);
             }
           }

Reply via email to