This is an automated email from the ASF dual-hosted git repository. nicoloboschi pushed a commit to branch ds-4.14 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit 99563890aa26851828be8e84b8ef09a86572d6e0 Author: Michael Marshall <[email protected]> AuthorDate: Mon Sep 19 02:57:51 2022 -0700 Check if channel closed before processing read request (#3486) * Check if channel closed before processing read request * Add missed call to onReadRequestFinish() * Fix test * Mock more tests Co-authored-by: Nicolò Boschi <[email protected]> (cherry picked from commit 1313b8e2e7964a0bbf2e221f49fe30a1fd812d31) --- .../main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java | 7 +++++++ .../java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java | 7 +++++++ .../org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java | 1 + .../apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java | 1 + .../java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java | 1 + .../java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java | 1 + .../org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java | 1 + 7 files changed, 19 insertions(+) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java index 60de0440c1..7647e4afef 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessor.java @@ -65,6 +65,13 @@ class ReadEntryProcessor extends PacketProcessorBase<ReadRequest> { if (LOG.isDebugEnabled()) { LOG.debug("Received new read request: {}", request); } + if (!channel.isOpen()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Dropping read request for closed channel: {}", channel); + } + requestProcessor.onReadRequestFinish(); + return; + } int errorCode = BookieProtocol.EOK; long startTimeNanos = MathUtils.nowInNano(); ByteBuf data = null; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java index a8ecc11d7e..6b3b624141 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/ReadEntryProcessorV3.java @@ -248,6 +248,13 @@ class ReadEntryProcessorV3 extends PacketProcessorBaseV3 { public void safeRun() { requestProcessor.getRequestStats().getReadEntrySchedulingDelayStats().registerSuccessfulEvent( MathUtils.elapsedNanos(enqueueNanos), TimeUnit.NANOSECONDS); + if (!channel.isOpen()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Dropping read request for closed channel: {}", channel); + } + requestProcessor.onReadRequestFinish(); + return; + } if (!isVersionCompatible()) { ReadResponse readResponse = ReadResponse.newBuilder() diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java index 1b07fbb4d1..ecb2676bc8 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ForceLedgerProcessorV3Test.java @@ -71,6 +71,7 @@ public class ForceLedgerProcessorV3Test { .build()) .build(); channel = mock(Channel.class); + when(channel.isOpen()).thenReturn(true); bookie = mock(Bookie.class); requestProcessor = mock(BookieRequestProcessor.class); when(requestProcessor.getBookie()).thenReturn(bookie); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java index 393d5ddf33..9eae1b9c0d 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3Test.java @@ -83,6 +83,7 @@ public class LongPollReadEntryProcessorV3Test { .build(); Channel channel = mock(Channel.class); + when(channel.isOpen()).thenReturn(true); Bookie bookie = mock(Bookie.class); BookieRequestProcessor requestProcessor = mock(BookieRequestProcessor.class); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java index fcd74b1705..1fd42fd16f 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/ReadEntryProcessorTest.java @@ -60,6 +60,7 @@ public class ReadEntryProcessorTest { @Before public void setup() throws IOException, BookieException { channel = mock(Channel.class); + when(channel.isOpen()).thenReturn(true); bookie = mock(Bookie.class); requestProcessor = mock(BookieRequestProcessor.class); when(requestProcessor.getBookie()).thenReturn(bookie); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java index a69245d4a9..0752c05c32 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorTest.java @@ -64,6 +64,7 @@ public class WriteEntryProcessorTest { new byte[0], Unpooled.wrappedBuffer("test-entry-data".getBytes(UTF_8))); channel = mock(Channel.class); + when(channel.isOpen()).thenReturn(true); bookie = mock(Bookie.class); requestProcessor = mock(BookieRequestProcessor.class); when(requestProcessor.getBookie()).thenReturn(bookie); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java index 7abaa100c8..477d83bb2a 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/WriteEntryProcessorV3Test.java @@ -76,6 +76,7 @@ public class WriteEntryProcessorV3Test { .build()) .build(); channel = mock(Channel.class); + when(channel.isOpen()).thenReturn(true); bookie = mock(Bookie.class); requestProcessor = mock(BookieRequestProcessor.class); when(requestProcessor.getBookie()).thenReturn(bookie);
