This is an automated email from the ASF dual-hosted git repository. yong pushed a commit to branch branch-4.15 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit 3b556e8fc6efce81f8cfdfe18389962ae28754c8 Author: Michael Marshall <[email protected]> AuthorDate: Mon Sep 19 02:57:51 2022 -0700 Check if channel closed before processing read request (#3486) * Check if channel closed before processing read request * Add missed call to onReadRequestFinish() * Fix test * Mock more tests Co-authored-by: Nicolò Boschi <[email protected]> (cherry picked from commit 1313b8e2e7964a0bbf2e221f49fe30a1fd812d31) --- .../main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java | 7 +++++++ .../java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java | 7 +++++++ .../org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java | 1 + .../apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java | 1 + .../java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java | 1 + .../java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java | 1 + .../org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java | 1 + 7 files changed, 19 insertions(+) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java index 7c8c2c8493..3373bf8ffd 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java @@ -61,6 +61,13 @@ class ReadEntryProcessor extends PacketProcessorBase<ReadRequest> { if (LOG.isDebugEnabled()) { LOG.debug("Received new read request: {}", request); } + if (!channel.isOpen()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Dropping read request for closed channel: {}", channel); + } + requestProcessor.onReadRequestFinish(); + return; + } int errorCode = BookieProtocol.EOK; long startTimeNanos = MathUtils.nowInNano(); ByteBuf data = null; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java index e03cb35fa5..75a10df2ae 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java @@ -249,6 +249,13 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 { public void safeRun() { requestProcessor.getRequestStats().getReadEntrySchedulingDelayStats().registerSuccessfulEvent( MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS); + if (!channel.isOpen()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Dropping read request for closed channel: {}", channel); + } + requestProcessor.onReadRequestFinish(); + return; + } if (!isVersionCompatible()) { ReadResponse readResponse = ReadResponse.newBuilder() diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java index 6c6eea7adb..54460770f1 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java @@ -72,6 +72,7 @@ public class ForceLedgerProcessorV3Test { .build()) .build(); channel = mock(Channel.class); + when(channel.isOpen()).thenReturn(true); bookie = mock(Bookie.class); requestProcessor = mock(BookieRequestProcessor.class); when(requestProcessor.getBookie()).thenReturn(bookie); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java index 393d5ddf33..9eae1b9c0d 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java @@ -83,6 +83,7 @@ public class LongPollReadEntryProcessorV3Test { .build(); Channel channel = mock(Channel.class); + when(channel.isOpen()).thenReturn(true); Bookie bookie = mock(Bookie.class); BookieRequestProcessor requestProcessor = mock(BookieRequestProcessor.class); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java index 4d18645b90..114d614b28 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java @@ -58,6 +58,7 @@ public class ReadEntryProcessorTest { @Before public void setup() throws IOException, BookieException { channel = mock(Channel.class); + when(channel.isOpen()).thenReturn(true); bookie = mock(Bookie.class); requestProcessor = mock(BookieRequestProcessor.class); when(requestProcessor.getBookie()).thenReturn(bookie); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java index e2c141b741..150e0c3089 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java @@ -67,6 +67,7 @@ public class WriteEntryProcessorTest { new byte[0], Unpooled.wrappedBuffer("test-entry-data".getBytes(UTF_8))); channel = mock(Channel.class); + when(channel.isOpen()).thenReturn(true); bookie = mock(Bookie.class); requestProcessor = mock(BookieRequestProcessor.class); when(requestProcessor.getBookie()).thenReturn(bookie); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java index 3c81d73a5e..2175b26a5b 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java @@ -77,6 +77,7 @@ public class WriteEntryProcessorV3Test { .build()) .build(); channel = mock(Channel.class); + when(channel.isOpen()).thenReturn(true); bookie = mock(Bookie.class); requestProcessor = mock(BookieRequestProcessor.class); when(requestProcessor.getBookie()).thenReturn(bookie);
