This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 70bbc3830f Fix the deadlock when only using io thread to handle
request (#3480)
70bbc3830f is described below
commit 70bbc3830f07691cbdb87fe03958196fafc0effb
Author: Yong Zhang <[email protected]>
AuthorDate: Mon Sep 12 18:49:32 2022 +0800
Fix the deadlock when only using io thread to handle request (#3480)
* Fix the deadlock when only using io thread to handle request
---
*Motivation*
If user don't configure the ReadWorker thread pool, the reqeust will
process with io thread. We cannot call await() from an IO thread,
if the socket buffer is full, that blocking call would cause a deadlock.
*Modification*
- only wait the promise when the thread is not io thread
* Fix the style issue
---
.../bookkeeper/proto/PacketProcessorBase.java | 6 +-
.../client/BookieRecoveryUseIOThreadTest.java | 77 ++++++++++++++++++++++
2 files changed, 82 insertions(+), 1 deletion(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
index be979ba093..07954d746a 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PacketProcessorBase.java
@@ -18,6 +18,7 @@
package org.apache.bookkeeper.proto;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.proto.BookieProtocol.Request;
import org.apache.bookkeeper.stats.OpStatsLogger;
@@ -136,7 +137,10 @@ abstract class PacketProcessorBase<T extends Request>
extends SafeRunnable {
*/
protected void sendResponseAndWait(int rc, Object response, OpStatsLogger
statsLogger) {
try {
- channel.writeAndFlush(response).await();
+ ChannelFuture future = channel.writeAndFlush(response);
+ if (!channel.eventLoop().inEventLoop()) {
+ future.await();
+ }
} catch (InterruptedException e) {
return;
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryUseIOThreadTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryUseIOThreadTest.java
new file mode 100644
index 0000000000..fa1dba0cda
--- /dev/null
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieRecoveryUseIOThreadTest.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.client;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.conf.ClientConfiguration;
+import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class BookieRecoveryUseIOThreadTest extends BookKeeperClusterTestCase {
+
+ public BookieRecoveryUseIOThreadTest() {
+ super(1);
+ }
+
+ @Override
+ public void setUp() throws Exception {
+ baseConf.setNumAddWorkerThreads(0);
+ baseConf.setNumReadWorkerThreads(0);
+ baseConf.setNumHighPriorityWorkerThreads(0);
+ super.setUp();
+ }
+
+ @Test
+ public void testRecoveryClosedLedger() throws BKException, IOException,
InterruptedException {
+ // test the v2 protocol when using IO thread to handle the request
+ ClientConfiguration conf = new ClientConfiguration();
+ conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());
+ conf.setUseV2WireProtocol(true);
+ AtomicInteger finalRc = new AtomicInteger(Integer.MAX_VALUE);
+ CountDownLatch latch = new CountDownLatch(1);
+ try (BookKeeper bkc = new BookKeeper(conf)) {
+ bkc.asyncCreateLedger(1, 1, BookKeeper.DigestType.CRC32,
"".getBytes(),
+ new AsyncCallback.CreateCallback() {
+ @Override
+ public void createComplete(int rc, LedgerHandle lh, Object
ctx) {
+ lh.asyncAddEntry("hello".getBytes(), new
AsyncCallback.AddCallback() {
+ @Override
+ public void addComplete(int rc, LedgerHandle lh,
long entryId, Object ctx) {
+ if (rc == BKException.Code.OK) {
+ bkc.asyncOpenLedger(lh.ledgerId,
BookKeeper.DigestType.CRC32, "".getBytes(),
+ new AsyncCallback.OpenCallback() {
+ @Override
+ public void openComplete(int rc,
LedgerHandle lh, Object ctx) {
+ finalRc.set(rc);
+ latch.countDown();
+ }
+ }, null);
+ }
+ }
+ }, null);
+ }
+ }, null);
+ latch.await();
+ }
+ Assert.assertEquals(finalRc.get(),
org.apache.bookkeeper.client.api.BKException.Code.OK);
+ }
+}