This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new a32a7f0 Fixed simultaneus reads on same ledger/entry with v2 protocol
a32a7f0 is described below
commit a32a7f0eabf36ddc8310e9d3b506f2ab277102f1
Author: Ivan Kelly <[email protected]>
AuthorDate: Mon Dec 4 14:10:04 2017 -0800
Fixed simultaneus reads on same ledger/entry with v2 protocol
This change is coming from yahoo branch at yahoo/bookkeeperce7b0f7
With v2 protocol there's no way to disambiguate the read request since the
completion key is made from (ledgerId, entryId).
This PR introduced an additional multimap to handle the conflicts (multiple
simultaneus read requests for the same entry).
Author: Ivan Kelly <[email protected]>
Author: Matteo Merli <[email protected]>
Reviewers: Jia Zhai <None>, Sijie Guo <[email protected]>
This closes #720 from merlimat/concurrent-read-v2
---
.../bookkeeper/proto/PerChannelBookieClient.java | 154 ++++++++++++---------
.../apache/bookkeeper/client/BookKeeperTest.java | 70 +++++++++-
2 files changed, 156 insertions(+), 68 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index d470fc0..3aeddb2 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -20,46 +20,6 @@ package org.apache.bookkeeper.proto;
import static org.apache.bookkeeper.client.LedgerHandle.INVALID_ENTRY_ID;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Sets;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.ExtensionRegistry;
-
-import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
-import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.buffer.Unpooled;
-import io.netty.buffer.UnpooledByteBufAllocator;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelHandler.Sharable;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.DefaultEventLoopGroup;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.WriteBufferWaterMark;
-import io.netty.channel.epoll.EpollEventLoopGroup;
-import io.netty.channel.epoll.EpollSocketChannel;
-import io.netty.channel.local.LocalChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.handler.codec.CorruptedFrameException;
-import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import io.netty.handler.codec.LengthFieldPrepender;
-import io.netty.handler.codec.TooLongFrameException;
-import io.netty.handler.ssl.SslHandler;
-import io.netty.util.HashedWheelTimer;
-import io.netty.util.Recycler;
-import io.netty.util.Recycler.Handle;
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
-
import java.io.IOException;
import java.net.SocketAddress;
import java.security.cert.Certificate;
@@ -121,6 +81,48 @@ import org.apache.bookkeeper.util.SafeRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Joiner;
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Sets;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.ExtensionRegistry;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.DefaultEventLoopGroup;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.WriteBufferWaterMark;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollSocketChannel;
+import io.netty.channel.local.LocalChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.CorruptedFrameException;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import io.netty.handler.codec.LengthFieldPrepender;
+import io.netty.handler.codec.TooLongFrameException;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.HashedWheelTimer;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
+
/**
* This class manages all details of connection to a particular bookie. It also
* has reconnect logic if a connection to a bookie fails.
@@ -155,6 +157,11 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
private final ConcurrentHashMap<CompletionKey, CompletionValue>
completionObjects =
new ConcurrentHashMap<CompletionKey, CompletionValue>();
+ // Map that hold duplicated read requests. The idea is to only use this
map (synchronized) when there is a duplicate
+ // read request for the same ledgerId/entryId
+ private final ListMultimap<CompletionKey, CompletionValue>
completionObjectsV2Conflicts =
+ LinkedListMultimap.create();
+
private final StatsLogger statsLogger;
private final OpStatsLogger readEntryOpLogger;
private final OpStatsLogger readTimeoutOpLogger;
@@ -604,14 +611,13 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
.build();
}
- CompletionValue completion = new ReadCompletion(completionKey,
- cb, ctx,
- ledgerId, entryId);
- if (completionObjects.putIfAbsent(
- completionKey, completion) != null) {
- // We cannot have more than 1 pending read on the same
ledger/entry in the v2 protocol
-
completion.errorOut(BKException.Code.BookieHandleNotAvailableException);
- return;
+ ReadCompletion readCompletion = new ReadCompletion(completionKey, cb,
ctx, ledgerId, entryId);
+ CompletionValue existingValue =
completionObjects.putIfAbsent(completionKey, readCompletion);
+ if (existingValue != null) {
+ // There's a pending read request on same ledger/entry. Use the
multimap to track all of them
+ synchronized (completionObjectsV2Conflicts) {
+ completionObjectsV2Conflicts.put(completionKey,
readCompletion);
+ }
}
writeAndFlush(channel, completionKey, request);
@@ -726,15 +732,13 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
.build();
}
- CompletionValue completion = new ReadCompletion(completionKey, cb,
- ctx, ledgerId,
entryId);
- CompletionValue existingValue = completionObjects.putIfAbsent(
- completionKey, completion);
+ ReadCompletion readCompletion = new ReadCompletion(completionKey, cb,
ctx, ledgerId, entryId);
+ CompletionValue existingValue =
completionObjects.putIfAbsent(completionKey, readCompletion);
if (existingValue != null) {
- // There's a pending read request on same ledger/entry. This is
not supported in V2 protocol
- LOG.warn("Failing concurrent request to read at ledger: {} entry:
{}", ledgerId, entryId);
- completion.errorOut(BKException.Code.UnexpectedConditionException);
- return;
+ // There's a pending read request on same ledger/entry. Use the
multimap to track all of them
+ synchronized (completionObjectsV2Conflicts) {
+ completionObjectsV2Conflicts.put(completionKey,
readCompletion);
+ }
}
writeAndFlush(channel, completionKey, request);
@@ -868,6 +872,16 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
CompletionValue completion = completionObjects.remove(key);
if (completion != null) {
completion.errorOut(rc);
+ } else {
+ // If there's no completion object here, try in the multimap
+ synchronized (completionObjectsV2Conflicts) {
+ if (completionObjectsV2Conflicts.containsKey(key)) {
+ completion = completionObjectsV2Conflicts.get(key).get(0);
+ completionObjectsV2Conflicts.remove(key, completion);
+
+ completion.errorOut(rc);
+ }
+ }
}
}
@@ -879,13 +893,17 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
*/
void errorOutOutstandingEntries(int rc) {
-
// DO NOT rewrite these using Map.Entry iterations. We want to iterate
// on keys and see if we are successfully able to remove the key from
// the map. Because the add and the read methods also do the same thing
// in case they get a write failure on the socket. The one who
// successfully removes the key from the map is the one responsible for
// calling the application callback.
+ for (CompletionKey key : completionObjectsV2Conflicts.keySet()) {
+ while (completionObjectsV2Conflicts.get(key).size() > 0) {
+ errorOut(key, rc);
+ }
+ }
for (CompletionKey key : completionObjects.keySet()) {
errorOut(key, rc);
}
@@ -990,8 +1008,17 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
final StatusCode status =
getStatusCodeFromErrorCode(response.errorCode);
final CompletionKey key = acquireV2Key(ledgerId, entryId,
operationType);
- final CompletionValue completionValue = completionObjects.remove(key);
+ CompletionValue completionValue = completionObjects.remove(key);
key.release();
+ if (completionValue == null) {
+ // If there's no completion object here, try in the multimap
+ synchronized (this) {
+ if (completionObjectsV2Conflicts.containsKey(key)) {
+ completionValue =
completionObjectsV2Conflicts.get(key).get(0);
+ completionObjectsV2Conflicts.remove(key, completionValue);
+ }
+ }
+ }
if (null == completionValue) {
// Unexpected response, so log it. The txnId should have been
present.
@@ -1001,14 +1028,11 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
}
} else {
long orderingKey = completionValue.ledgerId;
+ final CompletionValue finalCompletionValue = completionValue;
- executor.submitOrdered(orderingKey, new SafeRunnable() {
- @Override
- public void safeRun() {
- completionValue.handleV2Response(ledgerId, entryId,
- status, response);
- response.recycle();
- }
+ executor.submitOrdered(orderingKey, () -> {
+ finalCompletionValue.handleV2Response(ledgerId, entryId,
status, response);
+ response.recycle();
});
}
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
index 3527575..547b367 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java
@@ -400,11 +400,11 @@ public class BookKeeperTest extends
BookKeeperClusterTestCase {
Assert.assertTrue(
"Expected LAC of wlh: " + (2 * numOfEntries - 1) + " actual
LAC of wlh: " + wlh.getLastAddConfirmed(),
(wlh.getLastAddConfirmed() == (2 * numOfEntries - 1)));
- // readhandle's lastaddconfirmed wont be updated until
readExplicitLastConfirmed call is made
+ // readhandle's lastaddconfirmed wont be updated until
readExplicitLastConfirmed call is made
Assert.assertTrue(
"Expected LAC of rlh: " + (numOfEntries - 2) + " actual LAC of
rlh: " + rlh.getLastAddConfirmed(),
(rlh.getLastAddConfirmed() == (numOfEntries - 2)));
-
+
long explicitlac = rlh.readExplicitLastConfirmed();
Assert.assertTrue(
"Expected Explicit LAC of rlh: " + (2 * numOfEntries - 1) + "
actual ExplicitLAC of rlh: " + explicitlac,
@@ -413,7 +413,7 @@ public class BookKeeperTest extends
BookKeeperClusterTestCase {
Assert.assertTrue(
"Expected LAC of rlh: " + (2 * numOfEntries - 1) + " actual
LAC of rlh: " + rlh.getLastAddConfirmed(),
(rlh.getLastAddConfirmed() == (2 * numOfEntries - 1)));
-
+
Enumeration<LedgerEntry> entries = rlh.readEntries(numOfEntries, 2 *
numOfEntries - 1);
int entryId = numOfEntries;
while (entries.hasMoreElements()) {
@@ -818,4 +818,68 @@ public class BookKeeperTest extends
BookKeeperClusterTestCase {
}
}
}
+
+ /**
+ * Tests that issuing multiple reads for the same entry at the same time
works as expected.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testDoubleRead() throws Exception {
+ LedgerHandle lh = bkc.createLedger(digestType, "".getBytes());
+
+ lh.addEntry("test".getBytes());
+
+ // Read the same entry more times asynchronously
+ final int N = 10;
+ final CountDownLatch latch = new CountDownLatch(N);
+ for (int i = 0; i < N; i++) {
+ lh.asyncReadEntries(0, 0, new ReadCallback() {
+ public void readComplete(int rc, LedgerHandle lh,
+ Enumeration<LedgerEntry> seq, Object
ctx) {
+ if (rc == BKException.Code.OK) {
+ latch.countDown();
+ } else {
+ fail("Read fail");
+ }
+ }
+ }, null);
+ }
+
+ latch.await();
+ }
+
+ /**
+ * Tests that issuing multiple reads for the same entry at the same time
works as expected.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testDoubleReadWithV2Protocol() throws Exception {
+ ClientConfiguration conf = new ClientConfiguration(baseClientConf);
+ conf.setUseV2WireProtocol(true);
+ BookKeeperTestClient bkc = new BookKeeperTestClient(conf);
+ LedgerHandle lh = bkc.createLedger(digestType, "".getBytes());
+
+ lh.addEntry("test".getBytes());
+
+ // Read the same entry more times asynchronously
+ final int N = 10;
+ final CountDownLatch latch = new CountDownLatch(N);
+ for (int i = 0; i < N; i++) {
+ lh.asyncReadEntries(0, 0, new ReadCallback() {
+ public void readComplete(int rc, LedgerHandle lh,
+ Enumeration<LedgerEntry> seq, Object
ctx) {
+ if (rc == BKException.Code.OK) {
+ latch.countDown();
+ } else {
+ fail("Read fail");
+ }
+ }
+ }, null);
+ }
+
+ latch.await();
+ bkc.close();
+ }
}
--
To stop receiving notification emails like this one, please contact
['"[email protected]" <[email protected]>'].