This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch ds-4.14
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit f260ff4876ba27ca088d2624942b56cf5c32a575
Author: pradeepbn <[email protected]>
AuthorDate: Wed Dec 1 09:37:28 2021 -0800

    Reorder the sequence of the bookkeeper server shutdown (#2888)
    
    Reorders the sequence of the bookkeeper server shutdown
    so that any in-progress reads or writes don't hit ledger
    storage after it has been shutdown. Now the request processor
    is shutdown before the bookie.
    
    An additional check if the channel is active is performed in
    the packet processor callbacks before sending response
    to avoid RejectedExecutionException messages within
    Netty from polluting the log.
    
    (cherry picked from commit 7395bb48e2ff41fac93a84ff8bfa9097bd0af8fe)
    (cherry picked from commit f8eb20db466ac434a7402539d0e3d4c362e57e0f)
---
 .../bookkeeper/proto/BookieRequestProcessor.java   |  3 ++
 .../org/apache/bookkeeper/proto/BookieServer.java  |  2 +-
 .../bookkeeper/proto/PacketProcessorBase.java      |  7 +++-
 .../bookkeeper/proto/PacketProcessorBaseV3.java    | 40 ++++++++++++----------
 .../proto/ForceLedgerProcessorV3Test.java          |  1 +
 .../bookkeeper/proto/WriteEntryProcessorTest.java  |  1 +
 .../proto/WriteEntryProcessorV3Test.java           |  1 +
 7 files changed, 35 insertions(+), 20 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
index 67f83e9ce5..902e2c1b41 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieRequestProcessor.java
@@ -264,6 +264,7 @@ public class BookieRequestProcessor implements 
RequestProcessor {
 
     @Override
     public void close() {
+        LOG.info("Closing RequestProcessor");
         shutdownExecutor(writeThreadPool);
         shutdownExecutor(readThreadPool);
         if (serverCfg.getNumLongPollWorkerThreads() > 0 || readThreadPool == 
null) {
@@ -271,6 +272,7 @@ public class BookieRequestProcessor implements 
RequestProcessor {
         }
         shutdownExecutor(highPriorityThreadPool);
         requestTimer.stop();
+        LOG.info("Closed RequestProcessor");
     }
 
     private OrderedExecutor createExecutor(
@@ -295,6 +297,7 @@ public class BookieRequestProcessor implements 
RequestProcessor {
     private void shutdownExecutor(OrderedExecutor service) {
         if (null != service) {
             service.shutdown();
+            service.forceShutdown(10, TimeUnit.SECONDS);
         }
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
index ee83327b2f..69aece207c 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieServer.java
@@ -222,8 +222,8 @@ public class BookieServer {
         if (!running) {
             return;
         }
-        exitCode = bookie.shutdown();
         this.requestProcessor.close();
+        exitCode = bookie.shutdown();
         running = false;
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
index 4cc7176ede..d416b9f141 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
@@ -66,7 +66,12 @@ abstract class PacketProcessorBase<T extends Request> 
extends SafeRunnable {
     }
 
     protected void sendResponse(int rc, Object response, OpStatsLogger 
statsLogger) {
-        channel.writeAndFlush(response, channel.voidPromise());
+        if (channel.isActive()) {
+            channel.writeAndFlush(response, channel.voidPromise());
+        } else {
+            LOGGER.debug("Netty channel {} is inactive, "
+                    + "hence bypassing netty channel writeAndFlush during 
sendResponse", channel);
+        }
         if (BookieProtocol.EOK == rc) {
             
statsLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueNanos), 
TimeUnit.NANOSECONDS);
         } else {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
index 15765a252b..d4ad65ba43 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBaseV3.java
@@ -87,25 +87,29 @@ public abstract class PacketProcessorBaseV3 extends 
SafeRunnable {
                 requestProcessor.invalidateBlacklist(channel);
             }
         }
-
-        channel.writeAndFlush(response).addListener(new 
ChannelFutureListener() {
-            @Override
-            public void operationComplete(ChannelFuture future) throws 
Exception {
-                long writeElapsedNanos = MathUtils.elapsedNanos(writeNanos);
-                if (!future.isSuccess()) {
-                    requestProcessor.getRequestStats().getChannelWriteStats()
-                        .registerFailedEvent(writeElapsedNanos, 
TimeUnit.NANOSECONDS);
-                } else {
-                    requestProcessor.getRequestStats().getChannelWriteStats()
-                        .registerSuccessfulEvent(writeElapsedNanos, 
TimeUnit.NANOSECONDS);
-                }
-                if (StatusCode.EOK == code) {
-                    
statsLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueNanos), 
TimeUnit.NANOSECONDS);
-                } else {
-                    
statsLogger.registerFailedEvent(MathUtils.elapsedNanos(enqueueNanos), 
TimeUnit.NANOSECONDS);
+        if (channel.isActive()) {
+            channel.writeAndFlush(response).addListener(new 
ChannelFutureListener() {
+                @Override
+                public void operationComplete(ChannelFuture future) throws 
Exception {
+                    long writeElapsedNanos = 
MathUtils.elapsedNanos(writeNanos);
+                    if (!future.isSuccess()) {
+                        
requestProcessor.getRequestStats().getChannelWriteStats()
+                                .registerFailedEvent(writeElapsedNanos, 
TimeUnit.NANOSECONDS);
+                    } else {
+                        
requestProcessor.getRequestStats().getChannelWriteStats()
+                                .registerSuccessfulEvent(writeElapsedNanos, 
TimeUnit.NANOSECONDS);
+                    }
+                    if (StatusCode.EOK == code) {
+                        
statsLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(enqueueNanos), 
TimeUnit.NANOSECONDS);
+                    } else {
+                        
statsLogger.registerFailedEvent(MathUtils.elapsedNanos(enqueueNanos), 
TimeUnit.NANOSECONDS);
+                    }
                 }
-            }
-        });
+            });
+        } else {
+            LOGGER.debug("Netty channel {} is inactive, "
+                    + "hence bypassing netty channel writeAndFlush during 
sendResponse", channel);
+        }
     }
 
     protected boolean isVersionCompatible() {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java
index bab83fb326..1b07fbb4d1 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java
@@ -76,6 +76,7 @@ public class ForceLedgerProcessorV3Test {
         when(requestProcessor.getBookie()).thenReturn(bookie);
         
when(requestProcessor.getWaitTimeoutOnBackpressureMillis()).thenReturn(-1L);
         when(requestProcessor.getRequestStats()).thenReturn(new 
RequestStats(NullStatsLogger.INSTANCE));
+        when(channel.isActive()).thenReturn(true);
         processor = new ForceLedgerProcessorV3(
             request,
             channel,
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
index bbcffea08c..27a4306a6e 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
@@ -68,6 +68,7 @@ public class WriteEntryProcessorTest {
         requestProcessor = mock(BookieRequestProcessor.class);
         when(requestProcessor.getBookie()).thenReturn(bookie);
         when(requestProcessor.getRequestStats()).thenReturn(new 
RequestStats(NullStatsLogger.INSTANCE));
+        when(channel.isActive()).thenReturn(true);
         processor = WriteEntryProcessor.create(
             request,
             channel,
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
index 292dc519ca..7abaa100c8 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
@@ -81,6 +81,7 @@ public class WriteEntryProcessorV3Test {
         when(requestProcessor.getBookie()).thenReturn(bookie);
         
when(requestProcessor.getWaitTimeoutOnBackpressureMillis()).thenReturn(-1L);
         when(requestProcessor.getRequestStats()).thenReturn(new 
RequestStats(NullStatsLogger.INSTANCE));
+        when(channel.isActive()).thenReturn(true);
         processor = new WriteEntryProcessorV3(
             request,
             channel,

Reply via email to