[
https://issues.apache.org/jira/browse/BOOKKEEPER-461?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13496659#comment-13496659
]
Flavio Junqueira commented on BOOKKEEPER-461:
---------------------------------------------
Hi Sijie, I have few questions about this patch
# With enqueueWithoutFailureByTopic, does it make sense to keep
enqueueWithoutFailure?
# I'm not entirely sure why you want some of the operations to execute in
separate threads. For example, in this block:
{code}
enqueueWithoutFailure(new CacheRequest() {
@Override
public void performRequest() {
// maintain the index of seq-id
MapMethods.addToMultiMap(orderedIndexOnSeqId,
cacheKey.getTopic(),
cacheKey.getSeqId(),
TreeSetLongFactory.instance);
// maintain the time index of addition
MapMethods.addToMultiMap(timeIndexOfAddition, currTime,
cacheKey,
HashSetCacheKeyFactory.instance);
// update time index
if (newCacheSize > maxCacheSize) {
collectOldCacheEntries();
}
}
});
{code}
is it the case that the MapMethods operations are expensive?
# In the following excerpt form your patch, you're inverting the order of
execution and I couldn't convince myself that it is not a problem:
{code}
- // maintain the time index of addition
- MapMethods.addToMultiMap(timeIndexOfAddition, currTime, cacheKey,
HashSetCacheKeyFactory.instance);
-
- // maintain the index of seq-id
- MapMethods.addToMultiMap(orderedIndexOnSeqId, cacheKey.getTopic(),
cacheKey.getSeqId(),
- TreeSetLongFactory.instance);
-
- // finally add the message to the cache
- cacheValue.setMessageAndInvokeCallbacks(message, currTime);
+ synchronized (cacheValue) {
+ // finally add the message to the cache
+ cacheValue.setMessageAndInvokeCallbacks(message, currTime);
+ }
// if overgrown, collect old entries
- collectOldCacheEntries();
+ enqueueWithoutFailure(new CacheRequest() {
+ @Override
+ public void performRequest() {
+ // maintain the index of seq-id
+ MapMethods.addToMultiMap(orderedIndexOnSeqId,
cacheKey.getTopic(),
+ cacheKey.getSeqId(),
TreeSetLongFactory.instance);
+
+ // maintain the time index of addition
+ MapMethods.addToMultiMap(timeIndexOfAddition, currTime,
+ cacheKey,
HashSetCacheKeyFactory.instance);
+ // update time index
+ if (newCacheSize > maxCacheSize) {
+ collectOldCacheEntries();
+ }
+ }
+ });
{code}
# Please update this: "so dont need to maintain" -> "so don't need to maintain"
# In the graph you posted last, what are the sharp drops? Is that you
restarting the hub?
> Delivery throughput degrades when there are lots of publishers w/ high
> traffic.
> -------------------------------------------------------------------------------
>
> Key: BOOKKEEPER-461
> URL: https://issues.apache.org/jira/browse/BOOKKEEPER-461
> Project: Bookkeeper
> Issue Type: Bug
> Reporter: Sijie Guo
> Assignee: Sijie Guo
> Fix For: 4.2.0
>
> Attachments: BOOKKEEPER-461.diff, BOOKKEEPER-461.diff,
> pub_sub_multithreads.png, pub_sub_singlethread.png
>
>
> When running benchmarking over the hub server, found that delivery throughput
> degrades when there are lots of publishers publishing messages. And the
> delivery throughput will goes up when there is no publishes.
> This issue is introduced due to ReadAheadCache only runs a single thread. So
> when the netty workers are busy handling publish requests, they are pushing
> lots of messages into ReadAheadCache's queue to put them in to read ahead
> cache. So the readahead cache is busy on updating keys.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira