This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new b67c16a HBASE-26866 Shutdown WAL may abort region server (#4254)
b67c16a is described below
commit b67c16a7636958970d37bfcd775fd55e8de98177
Author: Duo Zhang <[email protected]>
AuthorDate: Wed Mar 23 14:53:58 2022 +0800
HBASE-26866 Shutdown WAL may abort region server (#4254)
Signed-off-by: Xiaolin Ha <[email protected]>
---
.../hbase/regionserver/wal/AbstractFSWAL.java | 25 +++++++++++++++++-----
1 file changed, 20 insertions(+), 5 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index de8a6af..5416e3a 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -48,6 +48,8 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -345,8 +347,12 @@ public abstract class AbstractFSWAL<W extends WriterBase>
implements WAL {
protected final AtomicBoolean rollRequested = new AtomicBoolean(false);
- private final ExecutorService logArchiveOrShutdownExecutor =
Executors.newSingleThreadExecutor(
- new
ThreadFactoryBuilder().setDaemon(true).setNameFormat("WAL-Archive-Or-Shutdown-%d").build());
+ // Run in caller if we get reject execution exception, to avoid aborting
region server when we get
+ // reject execution exception. Usually this should not happen but let's make
it more robust.
+ private final ExecutorService logArchiveExecutor =
+ new ThreadPoolExecutor(1, 1, 1L, TimeUnit.MINUTES, new
LinkedBlockingQueue<Runnable>(),
+ new
ThreadFactoryBuilder().setDaemon(true).setNameFormat("WAL-Archive-%d").build(),
+ new ThreadPoolExecutor.CallerRunsPolicy());
private final int archiveRetries;
@@ -770,7 +776,7 @@ public abstract class AbstractFSWAL<W extends WriterBase>
implements WAL {
final List<Pair<Path, Long>> localLogsToArchive = logsToArchive;
// make it async
for (Pair<Path, Long> log : localLogsToArchive) {
- logArchiveOrShutdownExecutor.execute(() -> {
+ logArchiveExecutor.execute(() -> {
archive(log);
});
this.walFile2Props.remove(log.getFirst());
@@ -985,7 +991,10 @@ public abstract class AbstractFSWAL<W extends WriterBase>
implements WAL {
}
}
- Future<Void> future = logArchiveOrShutdownExecutor.submit(new
Callable<Void>() {
+ ExecutorService shutdownExecutor = Executors.newSingleThreadExecutor(
+ new
ThreadFactoryBuilder().setDaemon(true).setNameFormat("WAL-Shutdown-%d").build());
+
+ Future<Void> future = shutdownExecutor.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
if (rollWriterLock.tryLock(walShutdownTimeout, TimeUnit.SECONDS)) {
@@ -1003,7 +1012,7 @@ public abstract class AbstractFSWAL<W extends WriterBase>
implements WAL {
return null;
}
});
- logArchiveOrShutdownExecutor.shutdown();
+ shutdownExecutor.shutdown();
try {
future.get(walShutdownTimeout, TimeUnit.MILLISECONDS);
@@ -1020,6 +1029,12 @@ public abstract class AbstractFSWAL<W extends
WriterBase> implements WAL {
} else {
throw new IOException(e.getCause());
}
+ } finally {
+ // in shutdown we may call cleanOldLogs so shutdown this executor in the
end.
+ // In sync replication implementation, we may shutdown a WAL without
shutting down the whole
+ // region server, if we shutdown this executor earlier we may get reject
execution exception
+ // and abort the region server
+ logArchiveExecutor.shutdown();
}
}