This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 5997963 Added BlockingQueue implementation based on JCtools
5997963 is described below
commit 5997963d5b68d6fc6d341352fb40562eb92dab2b
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Nov 14 15:12:57 2018 -0800
Added BlockingQueue implementation based on JCtools
### Motivation
Add a `BlockingQueue` implementation that is suitable for low latency and
low contention.
Key points:
* Optimized for multiple producers and single consumer
* When waiting for entries, the thread is blocked with busy wait to avoid
context switch.
(This will be used in subsequent PRs to optionally enable it)
Reviewers: Enrico Olivelli <[email protected]>, Sijie Guo
<[email protected]>
This closes #1682 from merlimat/blocking-queue
---
bookkeeper-common/pom.xml | 4 +
.../common/collections/BlockingMpscQueue.java | 160 +++++++++++++++++++++
.../bookkeeper/common/collections/BusyWait.java | 65 +++++++++
.../common/collections/BlockingMpscQueueTest.java | 112 +++++++++++++++
.../src/main/resources/LICENSE-all.bin.txt | 2 +
.../src/main/resources/LICENSE-bkctl.bin.txt | 2 +
.../src/main/resources/LICENSE-server.bin.txt | 2 +
.../java/org/apache/bookkeeper/bookie/Bookie.java | 11 +-
.../java/org/apache/bookkeeper/bookie/Journal.java | 12 +-
.../bookkeeper/bookie/LedgerDescriptorImpl.java | 29 ++--
.../bookkeeper/proto/WriteLacProcessorV3.java | 5 +
.../bookkeeper/client/BookKeeperCloseTest.java | 4 +-
.../client/ParallelLedgerRecoveryTest.java | 4 +-
pom.xml | 8 ++
14 files changed, 394 insertions(+), 26 deletions(-)
diff --git a/bookkeeper-common/pom.xml b/bookkeeper-common/pom.xml
index 4b8b7e1..8946335 100644
--- a/bookkeeper-common/pom.xml
+++ b/bookkeeper-common/pom.xml
@@ -51,6 +51,10 @@
<artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
+ <groupId>org.jctools</groupId>
+ <artifactId>jctools-core</artifactId>
+ </dependency>
+ <dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<scope>provided</scope>
diff --git
a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/BlockingMpscQueue.java
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/BlockingMpscQueue.java
new file mode 100644
index 0000000..619c0ed
--- /dev/null
+++
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/BlockingMpscQueue.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.common.collections;
+
+import java.util.Collection;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.jctools.queues.MpscArrayQueue;
+
+/**
+ * Blocking queue optimized for multiple producers and single consumer.
+ */
+public class BlockingMpscQueue<T> extends MpscArrayQueue<T> implements
BlockingQueue<T> {
+
+ public BlockingMpscQueue(int size) {
+ super(size);
+ }
+
+ @Override
+ public void put(T e) throws InterruptedException {
+ while (!this.relaxedOffer(e)) {
+ // Do busy-spin loop
+ if (Thread.interrupted()) {
+ throw new InterruptedException();
+ }
+ }
+ }
+
+ @Override
+ public boolean offer(T e, long timeout, TimeUnit unit) throws
InterruptedException {
+ long absoluteEndTime = System.nanoTime() + unit.toNanos(timeout);
+
+ while (!this.relaxedOffer(e)) {
+ // Do busy-spin loop
+
+ if (System.nanoTime() > absoluteEndTime) {
+ return false;
+ }
+
+ if (Thread.interrupted()) {
+ throw new InterruptedException();
+ }
+ }
+
+ return true;
+ }
+
+ @Override
+ public T take() throws InterruptedException {
+ int idleCounter = 0;
+ while (true) {
+ T item = relaxedPoll();
+ if (item == null) {
+ if (Thread.interrupted()) {
+ throw new InterruptedException();
+ }
+
+ idleCounter = WAIT_STRATEGY.idle(idleCounter);
+ continue;
+ }
+
+
+ return item;
+ }
+ }
+
+ @Override
+ public T poll(long timeout, TimeUnit unit) throws InterruptedException {
+ long absoluteEndTime = System.nanoTime() + unit.toNanos(timeout);
+
+ int idleCounter = 0;
+ while (true) {
+ T item = relaxedPoll();
+ if (item == null) {
+ if (Thread.interrupted()) {
+ throw new InterruptedException();
+ }
+
+ if (System.nanoTime() > absoluteEndTime) {
+ return null;
+ } else {
+ idleCounter = WAIT_STRATEGY.idle(idleCounter);
+ continue;
+ }
+ }
+
+ return item;
+ }
+ }
+
+ @Override
+ public int remainingCapacity() {
+ return capacity() - size();
+ }
+
+ @Override
+ public int drainTo(Collection<? super T> c) {
+ int initialSize = c.size();
+
+ final DrainStrategy ds = new DrainStrategy();
+ drain(c::add, ds, ds);
+ return c.size() - initialSize;
+ }
+
+ @Override
+ public int drainTo(Collection<? super T> c, int maxElements) {
+ return drain(c::add, maxElements);
+ }
+
+ /**
+ * Wait strategy combined with exit condition, for draining the queue.
+ */
+ private static final class DrainStrategy implements WaitStrategy,
ExitCondition {
+
+ boolean reachedEnd = false;
+
+ @Override
+ public boolean keepRunning() {
+ return !reachedEnd;
+ }
+
+ @Override
+ public int idle(int idleCounter) {
+ reachedEnd = true;
+ return idleCounter;
+ }
+
+ }
+
+ /**
+ * Waiting strategy that starts with busy loop and gradually falls back to
sleeping if no items are available.
+ */
+ private static final WaitStrategy SPIN_STRATEGY = new WaitStrategy() {
+
+ @Override
+ public int idle(int idleCounter) {
+ BusyWait.onSpinWait();
+ return idleCounter + 1;
+ }
+ };
+
+ private static final WaitStrategy WAIT_STRATEGY = SPIN_STRATEGY;
+}
diff --git
a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/BusyWait.java
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/BusyWait.java
new file mode 100644
index 0000000..b44a9f6
--- /dev/null
+++
b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/BusyWait.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.common.collections;
+
+import java.lang.invoke.MethodHandle;
+import java.lang.invoke.MethodHandles;
+import java.lang.invoke.MethodType;
+
+import lombok.experimental.UtilityClass;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Utility class to use "Thread.onSpinWait()" when available.
+ */
+@UtilityClass
+@Slf4j
+public class BusyWait {
+
+ /**
+ * If available (Java 9+), use intrinsic {@link Thread#onSpinWait} which
will
+ * reduce CPU consumption during the wait, otherwise fallback to regular
+ * spinning.
+ */
+ public static void onSpinWait() {
+ if (ON_SPIN_WAIT != null) {
+ try {
+ ON_SPIN_WAIT.invokeExact();
+ } catch (Throwable t) {
+ // Ignore
+ }
+ }
+ }
+
+ private static final MethodHandle ON_SPIN_WAIT;
+
+ static {
+ MethodHandle handle = null;
+ try {
+ handle = MethodHandles.lookup().findStatic(Thread.class,
"onSpinWait", MethodType.methodType(void.class));
+ } catch (Throwable t) {
+ // Ignore
+ if (log.isDebugEnabled()) {
+ log.debug("Unable to use 'onSpinWait' from JVM", t);
+ }
+ }
+
+ ON_SPIN_WAIT = handle;
+ }
+}
diff --git
a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/collections/BlockingMpscQueueTest.java
b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/collections/BlockingMpscQueueTest.java
new file mode 100644
index 0000000..b778001
--- /dev/null
+++
b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/collections/BlockingMpscQueueTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.common.collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+
+/**
+ * Unit tests for {@link BlockingMpscQueue}.
+ */
+public class BlockingMpscQueueTest {
+
+ @Test
+ public void basicTest() throws Exception {
+ final int size = 15;
+ BlockingQueue<Integer> queue = new BlockingMpscQueue<>(size);
+
+ for (int i = 0; i < size; i++) {
+ queue.put(i);
+
+ assertEquals(size - i, queue.remainingCapacity());
+ }
+
+ assertEquals(size, queue.size());
+
+ for (int i = 0; i < size; i++) {
+ Integer n = queue.take();
+ assertTrue(n != null);
+ }
+
+ assertEquals(0, queue.size());
+
+ Integer res = queue.poll(100, TimeUnit.MILLISECONDS);
+ assertNull(res);
+ }
+
+ @Test
+ public void testOffer() throws Exception {
+ final int size = 16;
+ BlockingQueue<Integer> queue = new BlockingMpscQueue<>(size);
+
+ for (int i = 0; i < size; i++) {
+ assertTrue(queue.offer(1, 100, TimeUnit.MILLISECONDS));
+ }
+
+ assertEquals(size, queue.size());
+
+ assertFalse(queue.offer(1, 100, TimeUnit.MILLISECONDS));
+ assertEquals(size, queue.size());
+ }
+
+ @Test
+ public void testDrain() throws Exception {
+ final int size = 10;
+ BlockingQueue<Integer> queue = new BlockingMpscQueue<>(size);
+
+ for (int i = 0; i < size; i++) {
+ queue.put(i);
+ }
+
+ List<Integer> list = new ArrayList<>(size);
+ queue.drainTo(list);
+
+ assertEquals(size, list.size());
+
+ assertEquals(0, queue.size());
+
+ Integer res = queue.poll(100, TimeUnit.MILLISECONDS);
+ assertNull(res);
+ }
+
+ @Test
+ public void testDrainWithLimit() throws Exception {
+ final int size = 10;
+ BlockingQueue<Integer> queue = new BlockingMpscQueue<>(size);
+
+ for (int i = 0; i < size; i++) {
+ queue.put(i);
+ }
+
+ List<Integer> list = new ArrayList<>();
+ queue.drainTo(list, 5);
+ assertEquals(5, list.size());
+
+ assertEquals(5, queue.size());
+ }
+}
diff --git a/bookkeeper-dist/src/main/resources/LICENSE-all.bin.txt
b/bookkeeper-dist/src/main/resources/LICENSE-all.bin.txt
index 175050d..a1863d1 100644
--- a/bookkeeper-dist/src/main/resources/LICENSE-all.bin.txt
+++ b/bookkeeper-dist/src/main/resources/LICENSE-all.bin.txt
@@ -314,6 +314,7 @@ Apache Software License, Version 2.
- lib/org.inferred-freebuilder-1.14.9.jar [47]
- lib/com.google.errorprone-error_prone_annotations-2.1.2.jar [48]
- lib/org.apache.yetus-audience-annotations-0.5.0.jar [49]
+- lib/org.jctools-jctools-core-2.1.2.jar [50]
[1] Source available at
https://github.com/FasterXML/jackson-annotations/tree/jackson-annotations-2.9.7
[2] Source available at
https://github.com/FasterXML/jackson-core/tree/jackson-core-2.9.7
@@ -363,6 +364,7 @@ Apache Software License, Version 2.
[47] Source available at https://github.com/inferred/FreeBuilder/tree/v1.14.9
[48] Source available at https://github.com/google/error-prone/tree/v2.1.2
[49] Source available at https://github.com/apache/yetus/tree/rel/0.5.0
+[50] Source available at https://github.com/JCTools/JCTools/tree/v2.1.2
------------------------------------------------------------------------------------
diff --git a/bookkeeper-dist/src/main/resources/LICENSE-bkctl.bin.txt
b/bookkeeper-dist/src/main/resources/LICENSE-bkctl.bin.txt
index 486a35a..692e9a0 100644
--- a/bookkeeper-dist/src/main/resources/LICENSE-bkctl.bin.txt
+++ b/bookkeeper-dist/src/main/resources/LICENSE-bkctl.bin.txt
@@ -258,6 +258,7 @@ Apache Software License, Version 2.
- lib/org.inferred-freebuilder-1.14.9.jar [34]
- lib/com.google.errorprone-error_prone_annotations-2.1.2.jar [35]
- lib/org.apache.yetus-audience-annotations-0.5.0.jar [36]
+- lib/org.jctools-jctools-core-2.1.2.jar [37]
[1] Source available at
https://github.com/FasterXML/jackson-annotations/tree/jackson-annotations-2.8.9
[2] Source available at
https://github.com/FasterXML/jackson-core/tree/jackson-core-2.8.9
@@ -287,6 +288,7 @@ Apache Software License, Version 2.
[34] Source available at https://github.com/inferred/FreeBuilder/tree/v1.14.9
[35] Source available at https://github.com/google/error-prone/tree/v2.1.2
[36] Source available at https://github.com/apache/yetus/tree/rel/0.5.0
+[37] Source available at https://github.com/JCTools/JCTools/tree/v2.1.2
------------------------------------------------------------------------------------
lib/io.netty-netty-codec-4.1.31.Final.jar bundles some 3rd party dependencies
diff --git a/bookkeeper-dist/src/main/resources/LICENSE-server.bin.txt
b/bookkeeper-dist/src/main/resources/LICENSE-server.bin.txt
index 62b4f19..a8b5b88 100644
--- a/bookkeeper-dist/src/main/resources/LICENSE-server.bin.txt
+++ b/bookkeeper-dist/src/main/resources/LICENSE-server.bin.txt
@@ -278,6 +278,7 @@ Apache Software License, Version 2.
- lib/org.inferred-freebuilder-1.14.9.jar [34]
- lib/com.google.errorprone-error_prone_annotations-2.1.2.jar [35]
- lib/org.apache.yetus-audience-annotations-0.5.0.jar [36]
+- lib/org.jctools-jctools-core-2.1.2.jar [37]
[1] Source available at
https://github.com/FasterXML/jackson-annotations/tree/jackson-annotations-2.8.9
[2] Source available at
https://github.com/FasterXML/jackson-core/tree/jackson-core-2.8.9
@@ -315,6 +316,7 @@ Apache Software License, Version 2.
[34] Source available at https://github.com/inferred/FreeBuilder/tree/v1.14.9
[35] Source available at https://github.com/google/error-prone/tree/v2.1.2
[36] Source available at https://github.com/apache/yetus/tree/rel/0.5.0
+[37] Source available at https://github.com/JCTools/JCTools/tree/v2.1.2
------------------------------------------------------------------------------------
lib/io.netty-netty-codec-4.1.31.Final.jar bundles some 3rd party dependencies
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index d0db80e..cd0b2f7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -1133,7 +1133,7 @@ public class Bookie extends BookieCriticalThread {
*/
private void addEntryInternal(LedgerDescriptor handle, ByteBuf entry,
boolean ackBeforeSync, WriteCallback cb,
Object ctx, byte[] masterKey)
- throws IOException, BookieException {
+ throws IOException, BookieException, InterruptedException {
long ledgerId = handle.getLedgerId();
long entryId = handle.addEntry(entry);
@@ -1170,7 +1170,7 @@ public class Bookie extends BookieCriticalThread {
* is not exposed to users.
*/
public void recoveryAddEntry(ByteBuf entry, WriteCallback cb, Object ctx,
byte[] masterKey)
- throws IOException, BookieException {
+ throws IOException, BookieException, InterruptedException {
long requestNanos = MathUtils.nowInNano();
boolean success = false;
int entrySize = 0;
@@ -1208,7 +1208,7 @@ public class Bookie extends BookieCriticalThread {
}
public void setExplicitLac(ByteBuf entry, WriteCallback writeCallback,
Object ctx, byte[] masterKey)
- throws IOException, BookieException {
+ throws IOException, InterruptedException, BookieException {
try {
long ledgerId = entry.getLong(entry.readerIndex());
LedgerDescriptor handle = handles.getHandle(ledgerId, masterKey);
@@ -1254,7 +1254,7 @@ public class Bookie extends BookieCriticalThread {
* @throws BookieException.LedgerFencedException if the ledger is fenced
*/
public void addEntry(ByteBuf entry, boolean ackBeforeSync, WriteCallback
cb, Object ctx, byte[] masterKey)
- throws IOException, BookieException.LedgerFencedException,
BookieException {
+ throws IOException, BookieException.LedgerFencedException,
BookieException, InterruptedException {
long requestNanos = MathUtils.nowInNano();
boolean success = false;
int entrySize = 0;
@@ -1313,7 +1313,8 @@ public class Bookie extends BookieCriticalThread {
* This method is idempotent. Once a ledger is fenced, it can
* never be unfenced. Fencing a fenced ledger has no effect.
*/
- public SettableFuture<Boolean> fenceLedger(long ledgerId, byte[]
masterKey) throws IOException, BookieException {
+ public SettableFuture<Boolean> fenceLedger(long ledgerId, byte[] masterKey)
+ throws IOException, BookieException {
LedgerDescriptor handle = handles.getHandle(ledgerId, masterKey);
return handle.fenceAndLogInJournal(getJournal(ledgerId));
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 2568092..57f05c2 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -870,27 +870,29 @@ public class Journal extends BookieCriticalThread
implements CheckpointSource {
}
}
- public void logAddEntry(ByteBuffer entry, boolean ackBeforeSync,
WriteCallback cb, Object ctx) {
+ public void logAddEntry(ByteBuffer entry, boolean ackBeforeSync,
WriteCallback cb, Object ctx)
+ throws InterruptedException {
logAddEntry(Unpooled.wrappedBuffer(entry), ackBeforeSync, cb, ctx);
}
/**
* record an add entry operation in journal.
*/
- public void logAddEntry(ByteBuf entry, boolean ackBeforeSync,
WriteCallback cb, Object ctx) {
+ public void logAddEntry(ByteBuf entry, boolean ackBeforeSync,
WriteCallback cb, Object ctx)
+ throws InterruptedException {
long ledgerId = entry.getLong(entry.readerIndex() + 0);
long entryId = entry.getLong(entry.readerIndex() + 8);
logAddEntry(ledgerId, entryId, entry, ackBeforeSync, cb, ctx);
}
@VisibleForTesting
- void logAddEntry(long ledgerId, long entryId, ByteBuf entry,
- boolean ackBeforeSync, WriteCallback cb, Object ctx) {
+ void logAddEntry(long ledgerId, long entryId, ByteBuf entry, boolean
ackBeforeSync, WriteCallback cb, Object ctx)
+ throws InterruptedException {
//Retain entry until it gets written to journal
entry.retain();
journalQueueSize.inc();
- queue.add(QueueEntry.create(
+ queue.put(QueueEntry.create(
entry, ackBeforeSync, ledgerId, entryId, cb, ctx,
MathUtils.nowInNano(),
journalAddEntryStats, journalQueueSize));
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
index a0f34ea..ee91ed0 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
@@ -122,18 +122,23 @@ public class LedgerDescriptorImpl extends
LedgerDescriptor {
result = logFenceResult = SettableFuture.create();
}
ByteBuf entry = createLedgerFenceEntry(ledgerId);
- journal.logAddEntry(entry, false /* ackBeforeSync */, (rc, ledgerId,
entryId, addr, ctx) -> {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Record fenced state for ledger {} in journal with
rc {}",
- ledgerId, BKException.codeLogger(rc));
- }
- if (rc == 0) {
- fenceEntryPersisted.compareAndSet(false, true);
- result.set(true);
- } else {
- result.set(false);
- }
- }, null);
+ try {
+ journal.logAddEntry(entry, false /* ackBeforeSync */, (rc,
ledgerId, entryId, addr, ctx) -> {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Record fenced state for ledger {} in journal
with rc {}",
+ ledgerId, BKException.codeLogger(rc));
+ }
+ if (rc == 0) {
+ fenceEntryPersisted.compareAndSet(false, true);
+ result.set(true);
+ } else {
+ result.set(false);
+ }
+ }, null);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ result.setException(e);
+ }
return result;
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
index 2f018ff..7e42a73 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/WriteLacProcessorV3.java
@@ -111,6 +111,11 @@ class WriteLacProcessorV3 extends PacketProcessorBaseV3
implements Runnable {
logger.error("Error saving lac {} for ledger:{}",
lac, ledgerId, e);
status = StatusCode.EIO;
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ logger.error("Interrupted while saving lac {} for ledger:{}",
+ lac, ledgerId, e);
+ status = StatusCode.EIO;
} catch (BookieException e) {
logger.error("Unauthorized access to ledger:{} while adding
lac:{}",
ledgerId, lac, e);
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java
index 850fe5d..eda68c8 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperCloseTest.java
@@ -77,7 +77,7 @@ public class BookKeeperCloseTest extends
BookKeeperClusterTestCase {
@Override
public void recoveryAddEntry(ByteBuf entry, WriteCallback cb,
Object ctx, byte[] masterKey)
- throws IOException, BookieException {
+ throws IOException, BookieException,
InterruptedException {
try {
Thread.sleep(5000);
} catch (InterruptedException ie) {
@@ -91,7 +91,7 @@ public class BookKeeperCloseTest extends
BookKeeperClusterTestCase {
@Override
public void addEntry(ByteBuf entry, boolean ackBeforeSync,
WriteCallback cb,
Object ctx, byte[] masterKey)
- throws IOException, BookieException {
+ throws IOException, BookieException,
InterruptedException {
try {
Thread.sleep(5000);
} catch (InterruptedException ie) {
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
index fab13ed..5ba73c6 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
@@ -515,8 +515,8 @@ public class ParallelLedgerRecoveryTest extends
BookKeeperClusterTestCase {
}
@Override
- public void addEntry(ByteBuf entry, boolean ackBeforeSync, final
WriteCallback cb,
- Object ctx, byte[] masterKey) throws IOException,
BookieException {
+ public void addEntry(ByteBuf entry, boolean ackBeforeSync, final
WriteCallback cb, Object ctx, byte[] masterKey)
+ throws IOException, BookieException, InterruptedException {
super.addEntry(entry, ackBeforeSync, new WriteCallback() {
@Override
public void writeComplete(int rc, long ledgerId, long entryId,
diff --git a/pom.xml b/pom.xml
index 9903f5a..8f151b8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -170,6 +170,7 @@
<twitter-server.version>1.29.0</twitter-server.version>
<vertx.version>3.4.1</vertx.version>
<zookeeper.version>3.4.13</zookeeper.version>
+ <jctools.version>2.1.2</jctools.version>
<!-- plugin dependencies -->
<apache-rat-plugin.version>0.12</apache-rat-plugin.version>
<cobertura-maven-plugin.version>2.7</cobertura-maven-plugin.version>
@@ -584,6 +585,13 @@
<version>${jetty.version}</version>
</dependency>
+ <!-- JCTools -->
+ <dependency>
+ <groupId>org.jctools</groupId>
+ <artifactId>jctools-core</artifactId>
+ <version>${jctools.version}</version>
+ </dependency>
+
<!-- stats dependencies -->
<!-- dropwizard metrics -->
<dependency>