[
https://issues.apache.org/jira/browse/BOOKKEEPER-69?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13149238#comment-13149238
]
Sijie Guo commented on BOOKKEEPER-69:
-------------------------------------
In TopicManager, acquireTopic / releaseTopic puts a TopicOp in topicOpQueue.
And the TopicOps are executed one by one. So if we can guarantee that AcquireOp
and ReleaseOp trigger callback only when they finished acquire/release, we can
avoid race condition.
In AcquireOp:
It uses a MultiCallback to trigger original callback when its listeners
(SubscritpionManager / PersistenceManager) finished acquired topic, as below:
{code:title=AbstractTopicManager.java}
protected final synchronized void notifyListenersAndAddToOwnedTopics(final
ByteString topic,
final Callback<HedwigSocketAddress> originalCallback, final Object
originalContext) {
Callback<Void> postCb = new Callback<Void>() {
@Override
public void operationFinished(Object ctx, Void resultOfOperation) {
topics.add(topic);
if (cfg.getRetentionSecs() > 0) {
scheduler.schedule(new Runnable() {
@Override
public void run() {
// Enqueue a release operation. (Recall that release
// doesn't "fail" even if the topic is missing.)
releaseTopic(topic, new Callback<Void>() {
@Override
public void operationFailed(Object ctx,
PubSubException exception) {
logger.error("failure that should never
happen when periodically releasing topic "
+ topic, exception);
}
@Override
public void operationFinished(Object ctx, Void
resultOfOperation) {
logger.debug("successful periodic release
of topic " + topic);
}
}, null);
}
}, cfg.getRetentionSecs(), TimeUnit.SECONDS);
}
originalCallback.operationFinished(originalContext, addr);
}
@Override
public void operationFailed(Object ctx, PubSubException exception) {
// TODO: optimization: we can release this as soon as we
experience the first error.
realReleaseTopic(topic, CallbackUtils.curry(originalCallback,
addr), originalContext);
originalCallback.operationFailed(ctx, exception);
}
};
Callback<Void> mcb = CallbackUtils.multiCallback(listeners.size(),
postCb, null);
for (TopicOwnershipChangeListener listener : listeners) {
listener.acquiredTopic(topic, mcb, null);
}
}
{code}
If topic acquisition is proceed without any error, the original callback did be
triggered after all acquisition works are done in every manager.
If topic acquisition is proceed with any error, it did realReleaseTopic. since
realReleaseTopic is processed in asynchronous way. so the original callback
will be triggered immediately when realReleaseTopic is called. So topic manager
may process acquiring/releasing a same topic in same time.
Ivan's patch fixed this issue by letting original callback is triggered after
realReleaseTopic did finished actually.
{code}
---
a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java
+++
b/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java
@@ -135,10 +135,19 @@ public abstract class AbstractTopicManager implements
TopicManager {
}
@Override
- public void operationFailed(Object ctx, PubSubException exception)
{
+ public void operationFailed(final Object ctx, final
PubSubException exception) {
// TODO: optimization: we can release this as soon as we
experience the first error.
- realReleaseTopic(topic, CallbackUtils.curry(originalCallback,
addr), originalContext);
- originalCallback.operationFailed(ctx, exception);
+ Callback<Void> cb = new Callback<Void>() {
+ public void operationFinished(Object _ctx, Void
_resultOfOperation) {
+ originalCallback.operationFailed(ctx, exception);
+ }
+ public void operationFailed(Object _ctx, PubSubException
_exception) {
+ logger.error("Exception releasing topic", _exception);
+ originalCallback.operationFailed(ctx, exception);
+ }
+ };
+
+ realReleaseTopic(topic, cb, originalContext);
}
};
{code}
Although in realReleaseTopic, its listeners
(SubscriptionManager/PersistenceManager) did losing topic in asynchronous way.
But it wouldn't introduce race again, since operations in
SubscriptionManager/PersistenceManager still be processed in a queue one by
one, which means newer acquire op will be executed until older releaseOp
executed.
{code}
// AbstractTopicManager.java
private void realReleaseTopic(ByteString topic, Callback<Void> callback,
Object ctx) {
for (TopicOwnershipChangeListener listener : listeners)
listener.lostTopic(topic);
topics.remove(topic);
postReleaseCleanup(topic, callback, ctx);
}
{code}
So Ivan's patch did fixed this issue. It is simple and clear. Thanks, Ivan.
> ServerRedirectLoopException when a machine (hosts bookie server & hub server)
> reboot, which is caused by race condition of topic manager
> ----------------------------------------------------------------------------------------------------------------------------------------
>
> Key: BOOKKEEPER-69
> URL: https://issues.apache.org/jira/browse/BOOKKEEPER-69
> Project: Bookkeeper
> Issue Type: Bug
> Components: hedwig-client, hedwig-server
> Affects Versions: 4.0.0
> Environment: 3 machines (perf8, perf9, perf10), each machine hosts a
> bookie server & a hub server.
> perf8 is used as default server for client 1. perf9 is used as default server
> for client 2.
> bookkeeper is configured as below:
> ensemble size is 3, quorum size is 2.
> Reporter: Sijie Guo
> Assignee: Sijie Guo
> Priority: Critical
> Fix For: 4.0.0
>
> Attachments: BOOKKEEPER-69.possiblefix.diff,
> bookkeeper-69-testcase.patch, bookkeeper-69.patch, bookkeeper-69.patch
>
>
> 1) machine perf10 is rebooted. the bookie server & hub server are not
> restarted automatically after reboot.
> 2) client 1 & client 2 are still running. the topics owned in perf10 will be
> re-assigned to perf8/perf9. but they would fail because not enough bookie
> servers are available.
> 3) after 2 hours, we found that perf10 is rebooted. we restarted bookie
> server & hub server on perf10
> 4) then we got ServerRedirectLoopException in client.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators:
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira