This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-4.15
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 3b556e8fc6efce81f8cfdfe18389962ae28754c8
Author: Michael Marshall <[email protected]>
AuthorDate: Mon Sep 19 02:57:51 2022 -0700

    Check if channel closed before processing read request (#3486)
    
    * Check if channel closed before processing read request
    
    * Add missed call to onReadRequestFinish()
    
    * Fix test
    
    * Mock more tests
    
    Co-authored-by: Nicolò Boschi <[email protected]>
    (cherry picked from commit 1313b8e2e7964a0bbf2e221f49fe30a1fd812d31)
---
 .../main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java  | 7 +++++++
 .../java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java     | 7 +++++++
 .../org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java    | 1 +
 .../apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java  | 1 +
 .../java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java   | 1 +
 .../java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java  | 1 +
 .../org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java     | 1 +
 7 files changed, 19 insertions(+)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
index 7c8c2c8493..3373bf8ffd 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java
@@ -61,6 +61,13 @@ class ReadEntryProcessor extends 
PacketProcessorBase<ReadRequest> {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Received new read request: {}", request);
         }
+        if (!channel.isOpen()) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Dropping read request for closed channel: {}", 
channel);
+            }
+            requestProcessor.onReadRequestFinish();
+            return;
+        }
         int errorCode = BookieProtocol.EOK;
         long startTimeNanos = MathUtils.nowInNano();
         ByteBuf data = null;
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
index e03cb35fa5..75a10df2ae 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java
@@ -249,6 +249,13 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 {
     public void safeRun() {
         
requestProcessor.getRequestStats().getReadEntrySchedulingDelayStats().registerSuccessfulEvent(
             MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS);
+        if (!channel.isOpen()) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Dropping read request for closed channel: {}", 
channel);
+            }
+            requestProcessor.onReadRequestFinish();
+            return;
+        }
 
         if (!isVersionCompatible()) {
             ReadResponse readResponse = ReadResponse.newBuilder()
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java
index 6c6eea7adb..54460770f1 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java
@@ -72,6 +72,7 @@ public class ForceLedgerProcessorV3Test {
                 .build())
             .build();
         channel = mock(Channel.class);
+        when(channel.isOpen()).thenReturn(true);
         bookie = mock(Bookie.class);
         requestProcessor = mock(BookieRequestProcessor.class);
         when(requestProcessor.getBookie()).thenReturn(bookie);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java
index 393d5ddf33..9eae1b9c0d 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java
@@ -83,6 +83,7 @@ public class LongPollReadEntryProcessorV3Test {
             .build();
 
         Channel channel = mock(Channel.class);
+        when(channel.isOpen()).thenReturn(true);
         Bookie bookie = mock(Bookie.class);
 
         BookieRequestProcessor requestProcessor = 
mock(BookieRequestProcessor.class);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java
index 4d18645b90..114d614b28 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java
@@ -58,6 +58,7 @@ public class ReadEntryProcessorTest {
     @Before
     public void setup() throws IOException, BookieException {
         channel = mock(Channel.class);
+        when(channel.isOpen()).thenReturn(true);
         bookie = mock(Bookie.class);
         requestProcessor = mock(BookieRequestProcessor.class);
         when(requestProcessor.getBookie()).thenReturn(bookie);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
index e2c141b741..150e0c3089 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java
@@ -67,6 +67,7 @@ public class WriteEntryProcessorTest {
             new byte[0],
             Unpooled.wrappedBuffer("test-entry-data".getBytes(UTF_8)));
         channel = mock(Channel.class);
+        when(channel.isOpen()).thenReturn(true);
         bookie = mock(Bookie.class);
         requestProcessor = mock(BookieRequestProcessor.class);
         when(requestProcessor.getBookie()).thenReturn(bookie);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
index 3c81d73a5e..2175b26a5b 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java
@@ -77,6 +77,7 @@ public class WriteEntryProcessorV3Test {
                 .build())
             .build();
         channel = mock(Channel.class);
+        when(channel.isOpen()).thenReturn(true);
         bookie = mock(Bookie.class);
         requestProcessor = mock(BookieRequestProcessor.class);
         when(requestProcessor.getBookie()).thenReturn(bookie);

Reply via email to