[ 
https://issues.apache.org/jira/browse/BOOKKEEPER-69?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13149238#comment-13149238
 ] 

Sijie Guo commented on BOOKKEEPER-69:
-------------------------------------

In TopicManager, acquireTopic / releaseTopic puts a TopicOp in topicOpQueue. 
And the TopicOps are executed one by one. So if we can guarantee that AcquireOp 
and ReleaseOp trigger callback only when they finished acquire/release, we can 
avoid race condition.

In AcquireOp:

It uses a MultiCallback to trigger original callback when its listeners 
(SubscritpionManager / PersistenceManager) finished acquired topic, as below:

{code:title=AbstractTopicManager.java}
    protected final synchronized void notifyListenersAndAddToOwnedTopics(final 
ByteString topic,
            final Callback<HedwigSocketAddress> originalCallback, final Object 
originalContext) {

        Callback<Void> postCb = new Callback<Void>() {

            @Override
            public void operationFinished(Object ctx, Void resultOfOperation) {
                topics.add(topic);
                if (cfg.getRetentionSecs() > 0) {
                    scheduler.schedule(new Runnable() {
                        @Override
                        public void run() {
                            // Enqueue a release operation. (Recall that release
                            // doesn't "fail" even if the topic is missing.)
                            releaseTopic(topic, new Callback<Void>() {

                                @Override
                                public void operationFailed(Object ctx, 
PubSubException exception) {
                                    logger.error("failure that should never 
happen when periodically releasing topic "
                                                 + topic, exception);
                                }

                                @Override
                                public void operationFinished(Object ctx, Void 
resultOfOperation) {
                                    logger.debug("successful periodic release 
of topic " + topic);
                                }

                            }, null);
                        }
                    }, cfg.getRetentionSecs(), TimeUnit.SECONDS);
                }
                originalCallback.operationFinished(originalContext, addr);
            }

            @Override
            public void operationFailed(Object ctx, PubSubException exception) {
                // TODO: optimization: we can release this as soon as we 
experience the first error.
                realReleaseTopic(topic, CallbackUtils.curry(originalCallback, 
addr), originalContext);
                originalCallback.operationFailed(ctx, exception);
            }
        };

        Callback<Void> mcb = CallbackUtils.multiCallback(listeners.size(), 
postCb, null);
        for (TopicOwnershipChangeListener listener : listeners) {
            listener.acquiredTopic(topic, mcb, null);
        }
    }
{code}

If topic acquisition is proceed without any error, the original callback did be 
triggered after all acquisition works are done in every manager.

If topic acquisition is proceed with any error, it did realReleaseTopic. since 
realReleaseTopic is processed in asynchronous way. so the original callback 
will be triggered immediately when realReleaseTopic is called. So topic manager 
may process acquiring/releasing a same topic in same time.

Ivan's patch fixed this issue by letting original callback is triggered after 
realReleaseTopic did finished actually.

{code}
--- 
a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java
+++ 
b/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java
@@ -135,10 +135,19 @@ public abstract class AbstractTopicManager implements 
TopicManager {
             }
 
             @Override
-            public void operationFailed(Object ctx, PubSubException exception) 
{
+            public void operationFailed(final Object ctx, final 
PubSubException exception) {
                 // TODO: optimization: we can release this as soon as we 
experience the first error.
-                realReleaseTopic(topic, CallbackUtils.curry(originalCallback, 
addr), originalContext);
-                originalCallback.operationFailed(ctx, exception);
+                Callback<Void> cb = new Callback<Void>() {
+                    public void operationFinished(Object _ctx, Void 
_resultOfOperation) {
+                        originalCallback.operationFailed(ctx, exception);
+                    }
+                    public void operationFailed(Object _ctx, PubSubException 
_exception) {
+                        logger.error("Exception releasing topic", _exception);
+                        originalCallback.operationFailed(ctx, exception);
+                    }
+                };
+                
+                realReleaseTopic(topic, cb, originalContext);
             }
         };
{code}

Although in realReleaseTopic, its listeners 
(SubscriptionManager/PersistenceManager) did losing topic in asynchronous way. 
But it wouldn't introduce race again, since operations in 
SubscriptionManager/PersistenceManager still be processed in a queue one by 
one, which means newer acquire op will be executed until older releaseOp 
executed.

{code}

    // AbstractTopicManager.java

    private void realReleaseTopic(ByteString topic, Callback<Void> callback, 
Object ctx) {
        for (TopicOwnershipChangeListener listener : listeners)
            listener.lostTopic(topic);
        topics.remove(topic);
        postReleaseCleanup(topic, callback, ctx);
    }

{code}

So Ivan's patch did fixed this issue. It is simple and clear. Thanks, Ivan.


                
> ServerRedirectLoopException when a machine (hosts bookie server & hub server) 
> reboot, which is caused by race condition of topic manager
> ----------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: BOOKKEEPER-69
>                 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-69
>             Project: Bookkeeper
>          Issue Type: Bug
>          Components: hedwig-client, hedwig-server
>    Affects Versions: 4.0.0
>         Environment: 3 machines (perf8, perf9, perf10), each machine hosts a 
> bookie server & a hub server.
> perf8 is used as default server for client 1. perf9 is used as default server 
> for client 2.
> bookkeeper is configured as below:
> ensemble size is 3, quorum size is 2.
>            Reporter: Sijie Guo
>            Assignee: Sijie Guo
>            Priority: Critical
>             Fix For: 4.0.0
>
>         Attachments: BOOKKEEPER-69.possiblefix.diff, 
> bookkeeper-69-testcase.patch, bookkeeper-69.patch, bookkeeper-69.patch
>
>
> 1) machine perf10 is rebooted. the bookie server & hub server are not 
> restarted automatically after reboot.
> 2) client 1 & client 2 are still running. the topics owned in perf10 will be 
> re-assigned to perf8/perf9. but they would fail because not enough bookie 
> servers are available.
> 3) after 2 hours, we found that perf10 is rebooted. we restarted bookie 
> server & hub server on perf10
> 4) then we got ServerRedirectLoopException in client.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to