This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 5997963  Added BlockingQueue implementation based on JCtools
5997963 is described below

commit 5997963d5b68d6fc6d341352fb40562eb92dab2b
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Nov 14 15:12:57 2018 -0800

    Added BlockingQueue implementation based on JCtools
    
    ### Motivation
    
    Add a `BlockingQueue` implementation that is suitable for low latency and 
low contention.
    
    Key points:
     * Optimized for multiple producers and single consumer
     * When waiting for entries, the thread is blocked with busy wait to avoid 
context switch.
    
    (This will be used in subsequent PRs to optionally enable it)
    
    Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo 
<[email protected]>
    
    This closes #1682 from merlimat/blocking-queue
---
 bookkeeper-common/pom.xml                          |   4 +
 .../common/collections/BlockingMpscQueue.java      | 160 +++++++++++++++++++++
 .../bookkeeper/common/collections/BusyWait.java    |  65 +++++++++
 .../common/collections/BlockingMpscQueueTest.java  | 112 +++++++++++++++
 .../src/main/resources/LICENSE-all.bin.txt         |   2 +
 .../src/main/resources/LICENSE-bkctl.bin.txt       |   2 +
 .../src/main/resources/LICENSE-server.bin.txt      |   2 +
 .../java/org/apache/bookkeeper/bookie/Bookie.java  |  11 +-
 .../java/org/apache/bookkeeper/bookie/Journal.java |  12 +-
 .../bookkeeper/bookie/LedgerDescriptorImpl.java    |  29 ++--
 .../bookkeeper/proto/WriteLacProcessorV3.java      |   5 +
 .../bookkeeper/client/BookKeeperCloseTest.java     |   4 +-
 .../client/ParallelLedgerRecoveryTest.java         |   4 +-
 pom.xml                                            |   8 ++
 14 files changed, 394 insertions(+), 26 deletions(-)

diff --git a/bookkeeper-common/pom.xml b/bookkeeper-common/pom.xml
index 4b8b7e1..8946335 100644
--- a/bookkeeper-common/pom.xml
+++ b/bookkeeper-common/pom.xml
@@ -51,6 +51,10 @@
       <artifactId>jackson-annotations</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.jctools</groupId>
+      <artifactId>jctools-core</artifactId>
+    </dependency>
+    <dependency>
       <groupId>com.google.code.findbugs</groupId>
       <artifactId>jsr305</artifactId>
       <scope>provided</scope>
diff --git 
a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/BlockingMpscQueue.java
 
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/BlockingMpscQueue.java
new file mode 100644
index 0000000..619c0ed
--- /dev/null
+++ 
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/BlockingMpscQueue.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.common.collections;
+
+import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.jctools.queues.MpscArrayQueue;
+
+/**
+ * Blocking queue optimized for multiple producers and single consumer.
+ */
+public class BlockingMpscQueue<T> extends MpscArrayQueue<T> implements 
BlockingQueue<T> {
+
+    public BlockingMpscQueue(int size) {
+        super(size);
+    }
+
+    @Override
+    public void put(T e) throws InterruptedException {
+        while (!this.relaxedOffer(e)) {
+            // Do busy-spin loop
+            if (Thread.interrupted()) {
+                throw new InterruptedException();
+            }
+        }
+    }
+
+    @Override
+    public boolean offer(T e, long timeout, TimeUnit unit) throws 
InterruptedException {
+        long absoluteEndTime = System.nanoTime() + unit.toNanos(timeout);
+
+        while (!this.relaxedOffer(e)) {
+            // Do busy-spin loop
+
+            if (System.nanoTime() > absoluteEndTime) {
+                return false;
+            }
+
+            if (Thread.interrupted()) {
+                throw new InterruptedException();
+            }
+        }
+
+        return true;
+    }
+
+    @Override
+    public T take() throws InterruptedException {
+        int idleCounter = 0;
+        while (true) {
+            T item = relaxedPoll();
+            if (item == null) {
+                if (Thread.interrupted()) {
+                    throw new InterruptedException();
+                }
+
+                idleCounter = WAIT_STRATEGY.idle(idleCounter);
+                continue;
+            }
+
+
+            return item;
+        }
+    }
+
+    @Override
+    public T poll(long timeout, TimeUnit unit) throws InterruptedException {
+        long absoluteEndTime = System.nanoTime() + unit.toNanos(timeout);
+
+        int idleCounter = 0;
+        while (true) {
+            T item = relaxedPoll();
+            if (item == null) {
+                if (Thread.interrupted()) {
+                    throw new InterruptedException();
+                }
+
+                if (System.nanoTime() > absoluteEndTime) {
+                    return null;
+                } else {
+                    idleCounter = WAIT_STRATEGY.idle(idleCounter);
+                    continue;
+                }
+            }
+
+            return item;
+        }
+    }
+
+    @Override
+    public int remainingCapacity() {
+        return capacity() - size();
+    }
+
+    @Override
+    public int drainTo(Collection<? super T> c) {
+        int initialSize = c.size();
+
+        final DrainStrategy ds = new DrainStrategy();
+        drain(c::add, ds, ds);
+        return c.size() - initialSize;
+    }
+
+    @Override
+    public int drainTo(Collection<? super T> c, int maxElements) {
+        return drain(c::add, maxElements);
+    }
+
+    /**
+     * Wait strategy combined with exit condition, for draining the queue.
+     */
+    private static final class DrainStrategy implements WaitStrategy, 
ExitCondition {
+
+        boolean reachedEnd = false;
+
+        @Override
+        public boolean keepRunning() {
+            return !reachedEnd;
+        }
+
+        @Override
+        public int idle(int idleCounter) {
+            reachedEnd = true;
+            return idleCounter;
+        }
+
+    }
+
+    /**
+     * Waiting strategy that starts with busy loop and gradually falls back to 
sleeping if no items are available.
+     */
+    private static final WaitStrategy SPIN_STRATEGY = new WaitStrategy() {
+
+        @Override
+        public int idle(int idleCounter) {
+            BusyWait.onSpinWait();
+            return idleCounter + 1;
+        }
+    };
+
+    private static final WaitStrategy WAIT_STRATEGY = SPIN_STRATEGY;
+}
diff --git 
a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/BusyWait.java
 
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/BusyWait.java
new file mode 100644
index 0000000..b44a9f6
--- /dev/null
+++ 
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/BusyWait.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.common.collections;
+
+import java.lang.invoke.MethodHandle;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.MethodType;
+
+import lombok.experimental.UtilityClass;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Utility class to use "Thread.onSpinWait()" when available.
+ */
+@UtilityClass
+@Slf4j
+public class BusyWait {
+
+    /**
+     * If available (Java 9+), use intrinsic {@link Thread#onSpinWait} which 
will
+     * reduce CPU consumption during the wait, otherwise fallback to regular
+     * spinning.
+     */
+    public static void onSpinWait() {
+        if (ON_SPIN_WAIT != null) {
+            try {
+                ON_SPIN_WAIT.invokeExact();
+            } catch (Throwable t) {
+                // Ignore
+            }
+        }
+    }
+
+    private static final MethodHandle ON_SPIN_WAIT;
+
+    static {
+        MethodHandle handle = null;
+        try {
+            handle = MethodHandles.lookup().findStatic(Thread.class, 
"onSpinWait", MethodType.methodType(void.class));
+        } catch (Throwable t) {
+            // Ignore
+            if (log.isDebugEnabled()) {
+                log.debug("Unable to use 'onSpinWait' from JVM", t);
+            }
+        }
+
+        ON_SPIN_WAIT = handle;
+    }
+}
diff --git 
a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/collections/BlockingMpscQueueTest.java
 
b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/collections/BlockingMpscQueueTest.java
new file mode 100644
index 0000000..b778001
--- /dev/null
+++ 
b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/collections/BlockingMpscQueueTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.common.collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+/**
+ * Unit tests for {@link BlockingMpscQueue}.
+ */
+public class BlockingMpscQueueTest {
+
+    @Test
+    public void basicTest() throws Exception {
+        final int size = 15;
+        BlockingQueue<Integer> queue = new BlockingMpscQueue<>(size);
+
+        for (int i = 0; i < size; i++) {
+            queue.put(i);
+
+            assertEquals(size - i, queue.remainingCapacity());
+        }
+
+        assertEquals(size, queue.size());
+
+        for (int i = 0; i < size; i++) {
+            Integer n = queue.take();
+            assertTrue(n != null);
+        }
+
+        assertEquals(0, queue.size());
+
+        Integer res = queue.poll(100, TimeUnit.MILLISECONDS);
+        assertNull(res);
+    }
+
+    @Test
+    public void testOffer() throws Exception {
+        final int size = 16;
+        BlockingQueue<Integer> queue = new BlockingMpscQueue<>(size);
+
+        for (int i = 0; i < size; i++) {
+            assertTrue(queue.offer(1, 100, TimeUnit.MILLISECONDS));
+        }
+
+        assertEquals(size, queue.size());
+
+        assertFalse(queue.offer(1, 100, TimeUnit.MILLISECONDS));
+        assertEquals(size, queue.size());
+    }
+
+    @Test
+    public void testDrain() throws Exception {
+        final int size = 10;
+        BlockingQueue<Integer> queue = new BlockingMpscQueue<>(size);
+
+        for (int i = 0; i < size; i++) {
+            queue.put(i);
+        }
+
+        List<Integer> list = new ArrayList<>(size);
+        queue.drainTo(list);
+
+        assertEquals(size, list.size());
+
+        assertEquals(0, queue.size());
+
+        Integer res = queue.poll(100, TimeUnit.MILLISECONDS);
+        assertNull(res);
+    }
+
+    @Test
+    public void testDrainWithLimit() throws Exception {
+        final int size = 10;
+        BlockingQueue<Integer> queue = new BlockingMpscQueue<>(size);
+
+        for (int i = 0; i < size; i++) {
+            queue.put(i);
+        }
+
+        List<Integer> list = new ArrayList<>();
+        queue.drainTo(list, 5);
+        assertEquals(5, list.size());
+
+        assertEquals(5, queue.size());
+    }
+}
diff --git a/bookkeeper-dist/src/main/resources/LICENSE-all.bin.txt 
b/bookkeeper-dist/src/main/resources/LICENSE-all.bin.txt
index 175050d..a1863d1 100644
--- a/bookkeeper-dist/src/main/resources/LICENSE-all.bin.txt
+++ b/bookkeeper-dist/src/main/resources/LICENSE-all.bin.txt
@@ -314,6 +314,7 @@ Apache Software License, Version 2.
 - lib/org.inferred-freebuilder-1.14.9.jar [47]
 - lib/com.google.errorprone-error_prone_annotations-2.1.2.jar [48]
 - lib/org.apache.yetus-audience-annotations-0.5.0.jar [49]
+- lib/org.jctools-jctools-core-2.1.2.jar [50]
 
 [1] Source available at 
https://github.com/FasterXML/jackson-annotations/tree/jackson-annotations-2.9.7
 [2] Source available at 
https://github.com/FasterXML/jackson-core/tree/jackson-core-2.9.7
@@ -363,6 +364,7 @@ Apache Software License, Version 2.
 [47] Source available at https://github.com/inferred/FreeBuilder/tree/v1.14.9
 [48] Source available at https://github.com/google/error-prone/tree/v2.1.2
 [49] Source available at https://github.com/apache/yetus/tree/rel/0.5.0
+[50] Source available at https://github.com/JCTools/JCTools/tree/v2.1.2
 
 
 
------------------------------------------------------------------------------------
diff --git a/bookkeeper-dist/src/main/resources/LICENSE-bkctl.bin.txt 
b/bookkeeper-dist/src/main/resources/LICENSE-bkctl.bin.txt
index 486a35a..692e9a0 100644
--- a/bookkeeper-dist/src/main/resources/LICENSE-bkctl.bin.txt
+++ b/bookkeeper-dist/src/main/resources/LICENSE-bkctl.bin.txt
@@ -258,6 +258,7 @@ Apache Software License, Version 2.
 - lib/org.inferred-freebuilder-1.14.9.jar [34]
 - lib/com.google.errorprone-error_prone_annotations-2.1.2.jar [35]
 - lib/org.apache.yetus-audience-annotations-0.5.0.jar [36]
+- lib/org.jctools-jctools-core-2.1.2.jar [37]
 
 [1] Source available at 
https://github.com/FasterXML/jackson-annotations/tree/jackson-annotations-2.8.9
 [2] Source available at 
https://github.com/FasterXML/jackson-core/tree/jackson-core-2.8.9
@@ -287,6 +288,7 @@ Apache Software License, Version 2.
 [34] Source available at https://github.com/inferred/FreeBuilder/tree/v1.14.9
 [35] Source available at https://github.com/google/error-prone/tree/v2.1.2
 [36] Source available at https://github.com/apache/yetus/tree/rel/0.5.0
+[37] Source available at https://github.com/JCTools/JCTools/tree/v2.1.2
 
 
------------------------------------------------------------------------------------
 lib/io.netty-netty-codec-4.1.31.Final.jar bundles some 3rd party dependencies
diff --git a/bookkeeper-dist/src/main/resources/LICENSE-server.bin.txt 
b/bookkeeper-dist/src/main/resources/LICENSE-server.bin.txt
index 62b4f19..a8b5b88 100644
--- a/bookkeeper-dist/src/main/resources/LICENSE-server.bin.txt
+++ b/bookkeeper-dist/src/main/resources/LICENSE-server.bin.txt
@@ -278,6 +278,7 @@ Apache Software License, Version 2.
 - lib/org.inferred-freebuilder-1.14.9.jar [34]
 - lib/com.google.errorprone-error_prone_annotations-2.1.2.jar [35]
 - lib/org.apache.yetus-audience-annotations-0.5.0.jar [36]
+- lib/org.jctools-jctools-core-2.1.2.jar [37]
 
 [1] Source available at 
https://github.com/FasterXML/jackson-annotations/tree/jackson-annotations-2.8.9
 [2] Source available at 
https://github.com/FasterXML/jackson-core/tree/jackson-core-2.8.9
@@ -315,6 +316,7 @@ Apache Software License, Version 2.
 [34] Source available at https://github.com/inferred/FreeBuilder/tree/v1.14.9
 [35] Source available at https://github.com/google/error-prone/tree/v2.1.2
 [36] Source available at https://github.com/apache/yetus/tree/rel/0.5.0
+[37] Source available at https://github.com/JCTools/JCTools/tree/v2.1.2
 
 
------------------------------------------------------------------------------------
 lib/io.netty-netty-codec-4.1.31.Final.jar bundles some 3rd party dependencies
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index d0db80e..cd0b2f7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -1133,7 +1133,7 @@ public class Bookie extends BookieCriticalThread {
      */
     private void addEntryInternal(LedgerDescriptor handle, ByteBuf entry,
                                   boolean ackBeforeSync, WriteCallback cb, 
Object ctx, byte[] masterKey)
-            throws IOException, BookieException {
+            throws IOException, BookieException, InterruptedException {
         long ledgerId = handle.getLedgerId();
         long entryId = handle.addEntry(entry);
 
@@ -1170,7 +1170,7 @@ public class Bookie extends BookieCriticalThread {
      * is not exposed to users.
      */
     public void recoveryAddEntry(ByteBuf entry, WriteCallback cb, Object ctx, 
byte[] masterKey)
-            throws IOException, BookieException {
+            throws IOException, BookieException, InterruptedException {
         long requestNanos = MathUtils.nowInNano();
         boolean success = false;
         int entrySize = 0;
@@ -1208,7 +1208,7 @@ public class Bookie extends BookieCriticalThread {
     }
 
     public void setExplicitLac(ByteBuf entry, WriteCallback writeCallback, 
Object ctx, byte[] masterKey)
-            throws IOException, BookieException {
+            throws IOException, InterruptedException, BookieException {
         try {
             long ledgerId = entry.getLong(entry.readerIndex());
             LedgerDescriptor handle = handles.getHandle(ledgerId, masterKey);
@@ -1254,7 +1254,7 @@ public class Bookie extends BookieCriticalThread {
      * @throws BookieException.LedgerFencedException if the ledger is fenced
      */
     public void addEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback 
cb, Object ctx, byte[] masterKey)
-            throws IOException, BookieException.LedgerFencedException, 
BookieException {
+            throws IOException, BookieException.LedgerFencedException, 
BookieException, InterruptedException {
         long requestNanos = MathUtils.nowInNano();
         boolean success = false;
         int entrySize = 0;
@@ -1313,7 +1313,8 @@ public class Bookie extends BookieCriticalThread {
      * This method is idempotent. Once a ledger is fenced, it can
      * never be unfenced. Fencing a fenced ledger has no effect.
      */
-    public SettableFuture<Boolean> fenceLedger(long ledgerId, byte[] 
masterKey) throws IOException, BookieException {
+    public SettableFuture<Boolean> fenceLedger(long ledgerId, byte[] masterKey)
+            throws IOException, BookieException {
         LedgerDescriptor handle = handles.getHandle(ledgerId, masterKey);
         return handle.fenceAndLogInJournal(getJournal(ledgerId));
     }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 2568092..57f05c2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -870,27 +870,29 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
         }
     }
 
-    public void logAddEntry(ByteBuffer entry, boolean ackBeforeSync, 
WriteCallback cb, Object ctx) {
+    public void logAddEntry(ByteBuffer entry, boolean ackBeforeSync, 
WriteCallback cb, Object ctx)
+            throws InterruptedException {
         logAddEntry(Unpooled.wrappedBuffer(entry), ackBeforeSync, cb, ctx);
     }
 
     /**
      * record an add entry operation in journal.
      */
-    public void logAddEntry(ByteBuf entry, boolean ackBeforeSync, 
WriteCallback cb, Object ctx) {
+    public void logAddEntry(ByteBuf entry, boolean ackBeforeSync, 
WriteCallback cb, Object ctx)
+            throws InterruptedException {
         long ledgerId = entry.getLong(entry.readerIndex() + 0);
         long entryId = entry.getLong(entry.readerIndex() + 8);
         logAddEntry(ledgerId, entryId, entry, ackBeforeSync, cb, ctx);
     }
 
     @VisibleForTesting
-    void logAddEntry(long ledgerId, long entryId, ByteBuf entry,
-                     boolean ackBeforeSync, WriteCallback cb, Object ctx) {
+    void logAddEntry(long ledgerId, long entryId, ByteBuf entry, boolean 
ackBeforeSync, WriteCallback cb, Object ctx)
+            throws InterruptedException {
         //Retain entry until it gets written to journal
         entry.retain();
 
         journalQueueSize.inc();
-        queue.add(QueueEntry.create(
+        queue.put(QueueEntry.create(
                 entry, ackBeforeSync,  ledgerId, entryId, cb, ctx, 
MathUtils.nowInNano(),
                 journalAddEntryStats, journalQueueSize));
     }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
index a0f34ea..ee91ed0 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
@@ -122,18 +122,23 @@ public class LedgerDescriptorImpl extends 
LedgerDescriptor {
             result = logFenceResult = SettableFuture.create();
         }
         ByteBuf entry = createLedgerFenceEntry(ledgerId);
-        journal.logAddEntry(entry, false /* ackBeforeSync */, (rc, ledgerId, 
entryId, addr, ctx) -> {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Record fenced state for ledger {} in journal with 
rc {}",
-                        ledgerId, BKException.codeLogger(rc));
-            }
-            if (rc == 0) {
-                fenceEntryPersisted.compareAndSet(false, true);
-                result.set(true);
-            } else {
-                result.set(false);
-            }
-        }, null);
+        try {
+            journal.logAddEntry(entry, false /* ackBeforeSync */, (rc, 
ledgerId, entryId, addr, ctx) -> {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Record fenced state for ledger {} in journal 
with rc {}",
+                            ledgerId, BKException.codeLogger(rc));
+                }
+                if (rc == 0) {
+                    fenceEntryPersisted.compareAndSet(false, true);
+                    result.set(true);
+                } else {
+                    result.set(false);
+                }
+            }, null);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            result.setException(e);
+        }
         return result;
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
index 2f018ff..7e42a73 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
@@ -111,6 +111,11 @@ class WriteLacProcessorV3 extends PacketProcessorBaseV3 
implements Runnable {
             logger.error("Error saving lac {} for ledger:{}",
                     lac, ledgerId, e);
             status = StatusCode.EIO;
+        } catch (InterruptedException  e) {
+            Thread.currentThread().interrupt();
+            logger.error("Interrupted while saving lac {} for ledger:{}",
+                    lac, ledgerId, e);
+            status = StatusCode.EIO;
         } catch (BookieException e) {
             logger.error("Unauthorized access to ledger:{} while adding 
lac:{}",
                     ledgerId, lac, e);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java
index 850fe5d..eda68c8 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java
@@ -77,7 +77,7 @@ public class BookKeeperCloseTest extends 
BookKeeperClusterTestCase {
                 @Override
                 public void recoveryAddEntry(ByteBuf entry, WriteCallback cb,
                                              Object ctx, byte[] masterKey)
-                        throws IOException, BookieException {
+                        throws IOException, BookieException, 
InterruptedException {
                     try {
                         Thread.sleep(5000);
                     } catch (InterruptedException ie) {
@@ -91,7 +91,7 @@ public class BookKeeperCloseTest extends 
BookKeeperClusterTestCase {
                 @Override
                 public void addEntry(ByteBuf entry, boolean ackBeforeSync, 
WriteCallback cb,
                                      Object ctx, byte[] masterKey)
-                        throws IOException, BookieException {
+                        throws IOException, BookieException, 
InterruptedException {
                     try {
                         Thread.sleep(5000);
                     } catch (InterruptedException ie) {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
index fab13ed..5ba73c6 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
@@ -515,8 +515,8 @@ public class ParallelLedgerRecoveryTest extends 
BookKeeperClusterTestCase {
         }
 
         @Override
-        public void addEntry(ByteBuf entry, boolean ackBeforeSync, final 
WriteCallback cb,
-                             Object ctx, byte[] masterKey) throws IOException, 
BookieException {
+        public void addEntry(ByteBuf entry, boolean ackBeforeSync, final 
WriteCallback cb, Object ctx, byte[] masterKey)
+                throws IOException, BookieException, InterruptedException {
             super.addEntry(entry, ackBeforeSync, new WriteCallback() {
                 @Override
                 public void writeComplete(int rc, long ledgerId, long entryId,
diff --git a/pom.xml b/pom.xml
index 9903f5a..8f151b8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -170,6 +170,7 @@
     <twitter-server.version>1.29.0</twitter-server.version>
     <vertx.version>3.4.1</vertx.version>
     <zookeeper.version>3.4.13</zookeeper.version>
+    <jctools.version>2.1.2</jctools.version>
     <!-- plugin dependencies -->
     <apache-rat-plugin.version>0.12</apache-rat-plugin.version>
     <cobertura-maven-plugin.version>2.7</cobertura-maven-plugin.version>
@@ -584,6 +585,13 @@
         <version>${jetty.version}</version>
       </dependency>
 
+      <!-- JCTools -->
+      <dependency>
+        <groupId>org.jctools</groupId>
+        <artifactId>jctools-core</artifactId>
+        <version>${jctools.version}</version>
+      </dependency>
+
       <!-- stats dependencies -->
       <!-- dropwizard metrics -->
       <dependency>

Reply via email to