Author: ivank
Date: Mon Nov 26 17:47:54 2012
New Revision: 1413745

URL: http://svn.apache.org/viewvc?rev=1413745&view=rev
Log:
BOOKKEEPER-440: Make Write/Delete SubscriptionData Restricted to Version 
(Fangmin Lv via ivank)

Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java
    
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/SubscriptionDataManager.java
    
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/ZkMetadataManagerFactory.java
    
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
    
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java
    
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java
    
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MMSubscriptionManager.java
    
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManager.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1413745&r1=1413744&r2=1413745&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Mon Nov 26 17:47:54 2012
@@ -146,6 +146,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-439: No more messages delivered after deleted consumed 
ledgers. (sijie via ivank)
 
+        BOOKKEEPER-440: Make Write/Delete SubscriptionData Restricted to 
Version (Fangmin Lv via ivank)
+
     IMPROVEMENTS:
 
       BOOKKEEPER-467: Allocate ports for testing dynamically (ivank)

Modified: 
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java
URL: 
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java?rev=1413745&r1=1413744&r2=1413745&view=diff
==============================================================================
--- 
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java
 (original)
+++ 
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java
 Mon Nov 26 17:47:54 2012
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -44,6 +45,7 @@ import org.apache.hedwig.server.meta.Met
 import org.apache.hedwig.server.meta.SubscriptionDataManager;
 import org.apache.hedwig.server.meta.TopicOwnershipManager;
 import org.apache.hedwig.server.meta.TopicPersistenceManager;
+import org.apache.hedwig.server.subscriptions.InMemorySubscriptionState;
 import org.apache.hedwig.server.topics.HubInfo;
 import org.apache.hedwig.server.topics.HubLoad;
 import org.apache.hedwig.util.Callback;
@@ -457,10 +459,16 @@ public class HedwigAdmin {
 
         final SyncObj<Map<ByteString, SubscriptionData>> syncObj =
             new SyncObj<Map<ByteString, SubscriptionData>>();
-        sdm.readSubscriptions(topic, new Callback<Map<ByteString, 
SubscriptionData>>() {
+        sdm.readSubscriptions(topic, new Callback<Map<ByteString, 
Versioned<SubscriptionData>>>() {
             @Override
-            public void operationFinished(Object ctx, Map<ByteString, 
SubscriptionData> result) {
-                syncObj.success(result);
+            public void operationFinished(Object ctx, Map<ByteString, 
Versioned<SubscriptionData>> result) {
+                // It was just used to console tool to print some information, 
so don't need to return version for it
+                // just keep the getTopicSubscriptions interface as before
+                Map<ByteString, SubscriptionData> subs = new 
ConcurrentHashMap<ByteString, SubscriptionData>();
+                for (Map.Entry<ByteString, Versioned<SubscriptionData>> 
subEntry : result.entrySet()) {
+                    subs.put(subEntry.getKey(), 
subEntry.getValue().getValue());
+                }
+                syncObj.success(subs);
             }
             @Override
             public void operationFailed(Object ctx, PubSubException pse) {
@@ -489,10 +497,14 @@ public class HedwigAdmin {
      */
     public SubscriptionData getSubscription(ByteString topic, ByteString 
subscriber) throws Exception {
         final SyncObj<SubscriptionData> syncObj = new 
SyncObj<SubscriptionData>();
-        sdm.readSubscriptionData(topic, subscriber, new 
Callback<SubscriptionData>() {
+        sdm.readSubscriptionData(topic, subscriber, new 
Callback<Versioned<SubscriptionData>>() {
             @Override
-            public void operationFinished(Object ctx, SubscriptionData result) 
{
-                syncObj.success(result);
+            public void operationFinished(Object ctx, 
Versioned<SubscriptionData> result) {
+                if (null == result) {
+                    syncObj.success(null);
+                } else {
+                    syncObj.success(result.getValue());
+                }
             }
             @Override
             public void operationFailed(Object ctx, PubSubException pse) {

Modified: 
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/SubscriptionDataManager.java
URL: 
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/SubscriptionDataManager.java?rev=1413745&r1=1413744&r2=1413745&view=diff
==============================================================================
--- 
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/SubscriptionDataManager.java
 (original)
+++ 
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/SubscriptionDataManager.java
 Mon Nov 26 17:47:54 2012
@@ -22,6 +22,8 @@ import java.util.Map;
 
 import com.google.protobuf.ByteString;
 
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.hedwig.exceptions.PubSubException;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
 import org.apache.hedwig.server.subscriptions.InMemorySubscriptionState;
@@ -42,14 +44,14 @@ public interface SubscriptionDataManager
      * @param data 
      *          Subscription data
      * @param callback
-     *          Callback when subscription state created.
+     *          Callback when subscription state created. New version would be 
returned.
      *          {@link PubSubException.SubscriptionStateExistsException} is 
returned when subscription state
      *          existed before.
      * @param ctx
      *          Context of the callback
      */
     public void createSubscriptionData(ByteString topic, ByteString 
subscriberId, SubscriptionData data,
-                                       Callback<Void> callback, Object ctx);
+                                       Callback<Version> callback, Object ctx);
 
     /**
      * Whether the metadata manager supports partial update.
@@ -71,15 +73,18 @@ public interface SubscriptionDataManager
      *          the part of data to update. The implementation should not 
replace
      *          existing subscription data with <i>dataToUpdate</i> directly.
      *          E.g. if there is only state in it, you should update state 
only.
+     * @param version
+     *          Current version of subscription data.
      * @param callback
-     *          Callback when subscription state updated.
+     *          Callback when subscription state updated. New version would be 
returned.
+     *          {@link PubSubException.BadVersionException} is returned when 
version doesn't match,
      *          {@link PubSubException.NoSubscriptionStateException} is 
returned when no subscription state
      *          is found.
      * @param ctx
      *          Context of the callback
      */
-    public void updateSubscriptionData(ByteString topic, ByteString 
subscriberId, SubscriptionData dataToUpdate,
-                                       Callback<Void> callback, Object ctx);
+    public void updateSubscriptionData(ByteString topic, ByteString 
subscriberId, SubscriptionData dataToUpdate, 
+                                       Version version, Callback<Version> 
callback, Object ctx);
 
     /**
      * Replace subscription data.
@@ -90,13 +95,18 @@ public interface SubscriptionDataManager
      *          Subscriber id
      * @param dataToReplace
      *          Subscription data to replace.
+     * @param version
+     *          Current version of subscription data.
      * @param callback
-     *          Callback when subscription state updated.
+     *          Callback when subscription state updated. New version would be 
returned.
+     *          {@link PubSubException.BadVersionException} is returned when 
version doesn't match,
+     *          {@link PubSubException.NoSubscriptionStateException} is 
returned when no subscription state
+     *          is found.
      * @param ctx
      *          Context of the callback
      */
     public void replaceSubscriptionData(ByteString topic, ByteString 
subscriberId, SubscriptionData dataToReplace,
-                                        Callback<Void> callback, Object ctx);
+                                        Version version, Callback<Version> 
callback, Object ctx);
 
     /**
      * Remove subscription data.
@@ -105,18 +115,21 @@ public interface SubscriptionDataManager
      *          Topic name
      * @param subscriberId
      *          Subscriber id
+     * @param version
+     *          Current version of subscription data.
      * @param callback
      *          Callback when subscription state deleted
+     *          {@link PubSubException.BadVersionException} is returned when 
version doesn't match,
      *          {@link PubSubException.NoSubscriptionStateException} is 
returned when no subscription state
      *          is found.
      * @param ctx
      *          Context of the callback
      */
-    public void deleteSubscriptionData(ByteString topic, ByteString 
subscriberId,
+    public void deleteSubscriptionData(ByteString topic, ByteString 
subscriberId, Version version,
                                        Callback<Void> callback, Object ctx);
 
     /**
-     * Read subscription data.
+     * Read subscription data with version.
      *
      * @param topic
      *          Topic Name
@@ -129,7 +142,7 @@ public interface SubscriptionDataManager
      *          Context of the callback
      */
     public void readSubscriptionData(ByteString topic, ByteString subscriberId,
-                                     Callback<SubscriptionData> callback, 
Object ctx);
+                                     Callback<Versioned<SubscriptionData>> 
callback, Object ctx);
 
     /**
      * Read all subscriptions of a topic.
@@ -137,10 +150,10 @@ public interface SubscriptionDataManager
      * @param topic
      *          Topic name
      * @param callback
-     *          Callback to return subscriptions
+     *          Callback to return subscriptions with version information
      * @param ctx
      *          Contxt of the callback
      */
-    public void readSubscriptions(ByteString topic, Callback<Map<ByteString, 
SubscriptionData>> cb,
+    public void readSubscriptions(ByteString topic, Callback<Map<ByteString, 
Versioned<SubscriptionData>>> cb,
                                   Object ctx);
 }

Modified: 
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/ZkMetadataManagerFactory.java
URL: 
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/ZkMetadataManagerFactory.java?rev=1413745&r1=1413744&r2=1413745&view=diff
==============================================================================
--- 
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/ZkMetadataManagerFactory.java
 (original)
+++ 
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/ZkMetadataManagerFactory.java
 Mon Nov 26 17:47:54 2012
@@ -37,6 +37,7 @@ import com.google.protobuf.ByteString;
 import com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
+import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.meta.ZkVersion;
 import org.apache.hedwig.exceptions.PubSubException;
 import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges;
@@ -383,7 +384,7 @@ public class ZkMetadataManagerFactory ex
 
         @Override
         public void createSubscriptionData(final ByteString topic, final 
ByteString subscriberId, final SubscriptionData data,
-                                           final Callback<Void> callback, 
final Object ctx) {
+                                           final Callback<Version> callback, 
final Object ctx) {
             ZkUtils.createFullPathOptimistic(zk, topicSubscriberPath(topic, 
subscriberId), data.toByteArray(),
             Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new 
SafeAsyncZKCallback.StringCallback() {
 
@@ -401,7 +402,7 @@ public class ZkMetadataManagerFactory ex
                                          + " subscriberId: " + 
subscriberId.toStringUtf8() + " data: "
                                          + 
SubscriptionStateUtils.toString(data));
                         }
-                        callback.operationFinished(ctx, null);
+                        callback.operationFinished(ctx, new ZkVersion(0));
                     } else {
                         KeeperException ke = 
ZkUtils.logErrorAndCreateZKException(
                                                  "Could not record new 
subscription for topic: " + topic.toStringUtf8()
@@ -414,15 +415,30 @@ public class ZkMetadataManagerFactory ex
 
         @Override
         public void updateSubscriptionData(final ByteString topic, final 
ByteString subscriberId, final SubscriptionData data,
-                                           final Callback<Void> callback, 
final Object ctx) {
+                                           final Version version, final 
Callback<Version> callback, final Object ctx) {
             throw new UnsupportedOperationException("ZooKeeper based metadata 
manager doesn't support partial update!");
         }
 
         @Override
         public void replaceSubscriptionData(final ByteString topic, final 
ByteString subscriberId, final SubscriptionData data,
-                                            final Callback<Void> callback, 
final Object ctx) {
-            zk.setData(topicSubscriberPath(topic, subscriberId), 
data.toByteArray(), -1,
-            new SafeAsyncZKCallback.StatCallback() {
+                                            final Version version, final 
Callback<Version> callback, final Object ctx) {
+            int znodeVersion = -1;
+            if (Version.NEW == version) {
+                callback.operationFailed(ctx, 
+                        new PubSubException.BadVersionException("Can not 
replace Version.New subscription data"));
+                return;
+            } else if (Version.ANY != version) {
+                if (!(version instanceof ZkVersion)) {
+                    callback.operationFailed(ctx, new 
PubSubException.UnexpectedConditionException(
+                                                  "Invalid version provided to 
replace subscription data for topic  " 
+                                                  + topic.toStringUtf8() + " 
subscribe id: " + subscriberId));
+                    return;
+                } else {
+                    znodeVersion = ((ZkVersion)version).getZnodeVersion();
+                }
+            }
+            zk.setData(topicSubscriberPath(topic, subscriberId), 
data.toByteArray(), 
+                    znodeVersion, new SafeAsyncZKCallback.StatCallback() {
                 @Override
                 public void safeProcessResult(int rc, String path, Object ctx, 
Stat stat) {
                     if (rc == Code.NONODE.intValue()) {
@@ -431,6 +447,12 @@ public class ZkMetadataManagerFactory ex
                                                       "No subscription state 
found for (topic:" + topic.toStringUtf8() + ", subscriber:"
                                                       + 
subscriberId.toStringUtf8() + ")."));
                         return;
+                    } else if (rc == Code.BadVersion) {
+                        // bad version
+                        callback.operationFailed(ctx, 
PubSubException.create(StatusCode.BAD_VERSION,
+                                                      "Bad version provided to 
replace subscription data of topic " 
+                                                      + topic.toStringUtf8() + 
" subscriberId " + subscriberId));
+                        return;
                     } else if (rc != Code.OK.intValue()) {
                         KeeperException e = 
ZkUtils.logErrorAndCreateZKException("Topic: " + topic.toStringUtf8()
                                             + " subscriberId: " + 
subscriberId.toStringUtf8()
@@ -444,16 +466,33 @@ public class ZkMetadataManagerFactory ex
                                          + 
SubscriptionStateUtils.toString(data));
                         }
 
-                        callback.operationFinished(ctx, null);
+                        callback.operationFinished(ctx, new 
ZkVersion(stat.getVersion()));
                     }
                 }
             }, ctx);
         }
 
         @Override
-        public void deleteSubscriptionData(final ByteString topic, final 
ByteString subscriberId,
+        public void deleteSubscriptionData(final ByteString topic, final 
ByteString subscriberId, Version version,
                                            final Callback<Void> callback, 
Object ctx) {
-            zk.delete(topicSubscriberPath(topic, subscriberId), -1, new 
SafeAsyncZKCallback.VoidCallback() {
+            
+            int znodeVersion = -1;
+            if (Version.NEW == version) {
+                callback.operationFailed(ctx, 
+                        new PubSubException.BadVersionException("Can not 
delete Version.New subscription data"));
+                return;
+            } else if (Version.ANY != version) {
+                if (!(version instanceof ZkVersion)) {
+                    callback.operationFailed(ctx, new 
PubSubException.UnexpectedConditionException(
+                                                  "Invalid version provided to 
delete subscription data for topic  " 
+                                                  + topic.toStringUtf8() + " 
subscribe id: " + subscriberId));
+                    return;
+                } else {
+                    znodeVersion = ((ZkVersion)version).getZnodeVersion();
+                }
+            }
+            
+            zk.delete(topicSubscriberPath(topic, subscriberId), znodeVersion, 
new SafeAsyncZKCallback.VoidCallback() {
                 @Override
                 public void safeProcessResult(int rc, String path, Object ctx) 
{
                     if (rc == Code.NONODE.intValue()) {
@@ -462,6 +501,12 @@ public class ZkMetadataManagerFactory ex
                                                       "No subscription state 
found for (topic:" + topic.toStringUtf8() + ", subscriber:"
                                                       + 
subscriberId.toStringUtf8() + ")."));
                         return;
+                    } else if (rc == Code.BadVersion) {
+                        // bad version
+                        callback.operationFailed(ctx, 
PubSubException.create(StatusCode.BAD_VERSION,
+                                                      "Bad version provided to 
delete subscription data of topic " 
+                                                      + topic.toStringUtf8() + 
" subscriberId " + subscriberId));
+                        return;
                     } else if (rc == Code.OK.intValue()) {
                         if (logger.isDebugEnabled()) {
                             logger.debug("Successfully deleted subscription 
for topic: " + topic.toStringUtf8()
@@ -481,7 +526,7 @@ public class ZkMetadataManagerFactory ex
 
         @Override
         public void readSubscriptionData(final ByteString topic, final 
ByteString subscriberId,
-                                         final Callback<SubscriptionData> 
callback, final Object ctx) {
+                                         final 
Callback<Versioned<SubscriptionData>> callback, final Object ctx) {
             zk.getData(topicSubscriberPath(topic, subscriberId), false, new 
SafeAsyncZKCallback.DataCallback() {
                 @Override
                 public void safeProcessResult(int rc, String path, Object ctx, 
byte[] data, Stat stat) {
@@ -496,10 +541,12 @@ public class ZkMetadataManagerFactory ex
                         callback.operationFailed(ctx, new 
PubSubException.ServiceDownException(e));
                         return;
                     }
-
-                    SubscriptionData subData;
+                    
+                    Versioned<SubscriptionData> subData;
                     try {
-                        subData = 
SubscriptionStateUtils.parseSubscriptionData(data);
+                        subData = new Versioned<SubscriptionData>(
+                                        
SubscriptionStateUtils.parseSubscriptionData(data), 
+                                        new ZkVersion(stat.getVersion()));
                     } catch (InvalidProtocolBufferException ex) {
                         String msg = "Failed to deserialize subscription data 
for topic: " + topic.toStringUtf8()
                                      + " subscriberId: " + 
subscriberId.toStringUtf8();
@@ -511,7 +558,7 @@ public class ZkMetadataManagerFactory ex
                     if (logger.isDebugEnabled()) {
                         logger.debug("Found subscription while acquiring 
topic: " + topic.toStringUtf8()
                                      + " subscriberId: " + 
subscriberId.toStringUtf8()
-                                     + " data: " + 
SubscriptionStateUtils.toString(subData));
+                                     + " data: " + 
SubscriptionStateUtils.toString(subData.getValue()));
                     }
                     callback.operationFinished(ctx, subData);
                 }
@@ -520,7 +567,7 @@ public class ZkMetadataManagerFactory ex
 
         @Override
         public void readSubscriptions(final ByteString topic,
-                                      final Callback<Map<ByteString, 
SubscriptionData>> cb, final Object ctx) {
+                                      final Callback<Map<ByteString, 
Versioned<SubscriptionData>>> cb, final Object ctx) {
             String topicSubscribersPath = topicSubscribersPath(new 
StringBuilder(), topic).toString();
             zk.getChildren(topicSubscribersPath, false, new 
SafeAsyncZKCallback.ChildrenCallback() {
                 @Override
@@ -533,7 +580,8 @@ public class ZkMetadataManagerFactory ex
                         return;
                     }
 
-                    final Map<ByteString, SubscriptionData> topicSubs = new 
ConcurrentHashMap<ByteString, SubscriptionData>();
+                    final Map<ByteString, Versioned<SubscriptionData>> 
topicSubs = 
+                            new ConcurrentHashMap<ByteString, 
Versioned<SubscriptionData>>();
 
                     if (rc == Code.NONODE.intValue() || children.size() == 0) {
                         if (logger.isDebugEnabled()) {
@@ -567,9 +615,11 @@ public class ZkMetadataManagerFactory ex
                                     return;
                                 }
 
-                                SubscriptionData subData;
+                                Versioned<SubscriptionData> subData;
                                 try {
-                                    subData = 
SubscriptionStateUtils.parseSubscriptionData(data);
+                                    subData = new Versioned<SubscriptionData>(
+                                            
SubscriptionStateUtils.parseSubscriptionData(data), 
+                                            new ZkVersion(stat.getVersion()));
                                 } catch (InvalidProtocolBufferException ex) {
                                     String msg = "Failed to deserialize 
subscription data for topic: " + topic.toStringUtf8()
                                                  + " subscriberId: " + 
subscriberId.toStringUtf8();
@@ -581,7 +631,7 @@ public class ZkMetadataManagerFactory ex
                                 if (logger.isDebugEnabled()) {
                                     logger.debug("Found subscription while 
acquiring topic: " + topic.toStringUtf8()
                                                  + " subscriberId: " + child + 
"state: "
-                                                 + 
SubscriptionStateUtils.toString(subData));
+                                                 + 
SubscriptionStateUtils.toString(subData.getValue()));
                                 }
 
                                 topicSubs.put(subscriberId, subData);

Modified: 
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
URL: 
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java?rev=1413745&r1=1413744&r2=1413745&view=diff
==============================================================================
--- 
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
 (original)
+++ 
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
 Mon Nov 26 17:47:54 2012
@@ -31,6 +31,10 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.protobuf.ByteString;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.hedwig.exceptions.PubSubException;
 import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
@@ -323,7 +327,10 @@ public abstract class AbstractSubscripti
 
     protected abstract void readSubscriptions(final ByteString topic,
             final Callback<Map<ByteString, InMemorySubscriptionState>> cb, 
final Object ctx);
-
+    
+    protected abstract void readSubscriptionData(final ByteString topic, final 
ByteString subscriberId, 
+            final Callback<InMemorySubscriptionState> cb, Object ctx);
+    
     private class SubscribeOp extends 
TopicOpQueuer.AsynchronousOp<SubscriptionData> {
         SubscribeRequest subRequest;
         MessageSeqId consumeSeqId;
@@ -428,21 +435,21 @@ public abstract class AbstractSubscripti
                 
SubscriptionData.newBuilder().setState(stateBuilder).setPreferences(preferencesBuilder);
             final SubscriptionData subData = subDataBuilder.build();
 
-            createSubscriptionData(topic, subscriberId, subData, new 
Callback<Void>() {
+            createSubscriptionData(topic, subscriberId, subData, new 
Callback<Version>() {
                 @Override
                 public void operationFailed(Object ctx, PubSubException 
exception) {
                     cb.operationFailed(ctx, exception);
                 }
 
                 @Override
-                public void operationFinished(Object ctx, Void 
resultOfOperation) {
+                public void operationFinished(Object ctx, final Version 
version) {
                     Callback<Void> cb2 = new Callback<Void>() {
                         @Override
                         public void operationFailed(final Object ctx, final 
PubSubException exception) {
                             logger.error("subscription for subscriber " + 
subscriberId.toStringUtf8() + " to topic "
                                          + topic.toStringUtf8() + " failed due 
to failed listener callback", exception);
                             // should remove subscription when synchronized 
cross-region subscription failed
-                            deleteSubscriptionData(topic, subscriberId, new 
Callback<Void>() {
+                            deleteSubscriptionData(topic, subscriberId, 
version, new Callback<Void>() {
                                 @Override
                                 public void operationFinished(Object context,
                                         Void resultOfOperation) {
@@ -463,7 +470,7 @@ public abstract class AbstractSubscripti
 
                         @Override
                         public void operationFinished(Object ctx, Void 
resultOfOperation) {
-                            topicSubscriptions.put(subscriberId, new 
InMemorySubscriptionState(subData));
+                            topicSubscriptions.put(subscriberId, new 
InMemorySubscriptionState(subData, version));
 
                             updateMessageBound(topic);
 
@@ -477,7 +484,7 @@ public abstract class AbstractSubscripti
                         && !hasLocalSubscriptions(topicSubscriptions))
                         notifyFirstLocalSubscribe(topic, 
subRequest.getSynchronous(), cb2, ctx);
                     else
-                        cb2.operationFinished(ctx, resultOfOperation);
+                        cb2.operationFinished(ctx, null);
                 }
             }, ctx);
         }
@@ -613,8 +620,9 @@ public abstract class AbstractSubscripti
                 cb.operationFailed(ctx, new 
PubSubException.ClientNotSubscribedException(""));
                 return;
             }
-
-            deleteSubscriptionData(topic, subscriberId, new Callback<Void>() {
+            
+            deleteSubscriptionData(topic, subscriberId, 
topicSubscriptions.get(subscriberId).getVersion(),
+                    new Callback<Void>() {
                 @Override
                 public void operationFailed(Object ctx, PubSubException 
exception) {
                     cb.operationFailed(ctx, exception);
@@ -679,44 +687,100 @@ public abstract class AbstractSubscripti
         }
     }
 
-    private void updateSubscriptionState(ByteString topic, ByteString 
subscriberId,
-                                         InMemorySubscriptionState state,
-                                         Callback<Void> callback, Object ctx) {
+    private void updateSubscriptionState(final ByteString topic, final 
ByteString subscriberId,
+                                         final InMemorySubscriptionState state,
+                                         final Callback<Void> callback, Object 
ctx) {
         SubscriptionData subData;
+        Callback<Version> cb = new Callback<Version>() {
+            @Override
+            public void operationFinished(Object ctx, Version version) {
+                state.setVersion(version);
+                callback.operationFinished(ctx, null);
+            }
+            @Override
+            public void operationFailed(Object ctx, PubSubException exception) 
{
+                if (exception instanceof PubSubException.BadVersionException) {
+                    readSubscriptionData(topic, subscriberId, new 
Callback<InMemorySubscriptionState>() {
+                        @Override
+                        public void operationFinished(Object ctx,
+                                InMemorySubscriptionState resultOfOperation) {
+                            state.setVersion(resultOfOperation.getVersion());
+                            updateSubscriptionState(topic, subscriberId, 
state, callback, ctx);
+                        }
+                        @Override
+                        public void operationFailed(Object ctx,
+                                PubSubException exception) {
+                            callback.operationFailed(ctx, exception);
+                        }
+                    }, ctx);
+                    
+                    return;
+                } 
+                callback.operationFailed(ctx, exception);
+            }
+        };
         if (isPartialUpdateSupported()) {
             subData = 
SubscriptionData.newBuilder().setState(state.getSubscriptionState()).build();
-            updateSubscriptionData(topic, subscriberId, subData, callback, 
ctx);
+            updateSubscriptionData(topic, subscriberId, subData, 
state.getVersion(), cb, ctx);
         } else {
             subData = state.toSubscriptionData();
-            replaceSubscriptionData(topic, subscriberId, subData, callback, 
ctx);
+            replaceSubscriptionData(topic, subscriberId, subData, 
state.getVersion(), cb, ctx);
         }
     }
 
-    private void updateSubscriptionPreferences(ByteString topic, ByteString 
subscriberId,
-                                               InMemorySubscriptionState state,
-                                               Callback<Void> callback, Object 
ctx) {
+    private void updateSubscriptionPreferences(final ByteString topic, final 
ByteString subscriberId,
+                                               final InMemorySubscriptionState 
state,
+                                               final Callback<Void> callback, 
Object ctx) {
         SubscriptionData subData;
+        Callback<Version> cb = new Callback<Version>() {
+            @Override
+            public void operationFinished(Object ctx, Version version) {
+                state.setVersion(version);
+                callback.operationFinished(ctx, null);
+            }
+            @Override
+            public void operationFailed(Object ctx, PubSubException exception) 
{
+                if (exception instanceof PubSubException.BadVersionException) {
+                    readSubscriptionData(topic, subscriberId, new 
Callback<InMemorySubscriptionState>() {
+                        @Override
+                        public void operationFinished(Object ctx,
+                                InMemorySubscriptionState resultOfOperation) {
+                            state.setVersion(resultOfOperation.getVersion());
+                            updateSubscriptionPreferences(topic, subscriberId, 
state, callback, ctx);
+                        }
+                        @Override
+                        public void operationFailed(Object ctx,
+                                PubSubException exception) {
+                            callback.operationFailed(ctx, exception);
+                        }
+                    }, ctx);
+                    
+                    return;
+                } 
+                callback.operationFailed(ctx, exception);
+            }
+        };
         if (isPartialUpdateSupported()) {
             subData = 
SubscriptionData.newBuilder().setPreferences(state.getSubscriptionPreferences()).build();
-            updateSubscriptionData(topic, subscriberId, subData, callback, 
ctx);
+            updateSubscriptionData(topic, subscriberId, subData, 
state.getVersion(), cb, ctx);
         } else {
             subData = state.toSubscriptionData();
-            replaceSubscriptionData(topic, subscriberId, subData, callback, 
ctx);
+            replaceSubscriptionData(topic, subscriberId, subData, 
state.getVersion(), cb, ctx);
         }
     }
 
     protected abstract boolean isPartialUpdateSupported();
 
     protected abstract void createSubscriptionData(final ByteString topic, 
ByteString subscriberId,
-            SubscriptionData data, Callback<Void> callback, Object ctx);
+            SubscriptionData data, Callback<Version> callback, Object ctx);
 
     protected abstract void updateSubscriptionData(ByteString topic, 
ByteString subscriberId, SubscriptionData data,
-            Callback<Void> callback, Object ctx);
+            Version version, Callback<Version> callback, Object ctx);
 
     protected abstract void replaceSubscriptionData(ByteString topic, 
ByteString subscriberId, SubscriptionData data,
-            Callback<Void> callback, Object ctx);
+            Version version, Callback<Version> callback, Object ctx);
 
-    protected abstract void deleteSubscriptionData(ByteString topic, 
ByteString subscriberId, Callback<Void> callback,
+    protected abstract void deleteSubscriptionData(ByteString topic, 
ByteString subscriberId, Version version, Callback<Void> callback,
             Object ctx);
 
 }

Modified: 
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java
URL: 
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java?rev=1413745&r1=1413744&r2=1413745&view=diff
==============================================================================
--- 
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java
 (original)
+++ 
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java
 Mon Nov 26 17:47:54 2012
@@ -22,6 +22,8 @@ import java.util.concurrent.ConcurrentHa
 import java.util.concurrent.ScheduledExecutorService;
 
 import com.google.protobuf.ByteString;
+
+import org.apache.bookkeeper.versioning.Version;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
 import org.apache.hedwig.server.common.ServerConfiguration;
 import org.apache.hedwig.server.delivery.DeliveryManager;
@@ -43,13 +45,13 @@ public class InMemorySubscriptionManager
 
     @Override
     protected void createSubscriptionData(ByteString topic, ByteString 
subscriberId, SubscriptionData subData,
-                                           Callback<Void> callback, Object 
ctx) {
+                                           Callback<Version> callback, Object 
ctx) {
         // nothing to do, in-memory info is already recorded by base class
         callback.operationFinished(ctx, null);
     }
 
     @Override
-    protected void deleteSubscriptionData(ByteString topic, ByteString 
subscriberId, Callback<Void> callback,
+    protected void deleteSubscriptionData(ByteString topic, ByteString 
subscriberId, Version version, Callback<Void> callback,
                                           Object ctx) {
         // nothing to do, in-memory info is already deleted by base class
         callback.operationFinished(ctx, null);
@@ -62,13 +64,13 @@ public class InMemorySubscriptionManager
 
     @Override
     protected void updateSubscriptionData(ByteString topic, ByteString 
subscriberId, SubscriptionData data,
-                                          Callback<Void> callback, Object ctx) 
{
+                                          Version version, Callback<Version> 
callback, Object ctx) {
         throw new UnsupportedOperationException("Doesn't support partial 
update");
     }
 
     @Override
     protected void replaceSubscriptionData(ByteString topic, ByteString 
subscriberId, SubscriptionData data,
-                                           Callback<Void> callback, Object 
ctx) {
+                                           Version version, Callback<Version> 
callback, Object ctx) {
         // nothing to do, in-memory info is already updated by base class
         callback.operationFinished(ctx, null);
     }
@@ -100,4 +102,18 @@ public class InMemorySubscriptionManager
 
     }
 
+    @Override
+    protected void readSubscriptionData(ByteString topic,
+            ByteString subscriberId, Callback<InMemorySubscriptionState> cb, 
Object ctx) {
+        // Since we backed up in-memory information on lostTopic, we can just 
return that back
+        InMemorySubscriptionState subState = 
top2sub2seqBackup.get(topic).remove(subscriberId);
+        
+        if (subState != null) {
+            cb.operationFinished(ctx, subState);
+        } else {
+            cb.operationFinished(ctx, new InMemorySubscriptionState(
+                    SubscriptionData.getDefaultInstance(), Version.NEW));
+        }
+    }
+
 }

Modified: 
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java
URL: 
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java?rev=1413745&r1=1413744&r2=1413745&view=diff
==============================================================================
--- 
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java
 (original)
+++ 
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java
 Mon Nov 26 17:47:54 2012
@@ -22,6 +22,7 @@ import java.util.Map;
 
 import com.google.protobuf.ByteString;
 
+import org.apache.bookkeeper.versioning.Version;
 import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
@@ -33,8 +34,9 @@ public class InMemorySubscriptionState {
     SubscriptionState subscriptionState;
     SubscriptionPreferences subscriptionPreferences;
     MessageSeqId lastConsumeSeqId;
+       Version version;
 
-    public InMemorySubscriptionState(SubscriptionData subscriptionData, 
MessageSeqId lastConsumeSeqId) {
+    public InMemorySubscriptionState(SubscriptionData subscriptionData, 
Version version, MessageSeqId lastConsumeSeqId) {
         this.subscriptionState = subscriptionData.getState();
         if (subscriptionData.hasPreferences()) {
             this.subscriptionPreferences = subscriptionData.getPreferences();
@@ -47,10 +49,11 @@ public class InMemorySubscriptionState {
 
         }
         this.lastConsumeSeqId = lastConsumeSeqId;
+        this.version = version;
     }
 
-    public InMemorySubscriptionState(SubscriptionData subscriptionData) {
-        this(subscriptionData, subscriptionData.getState().getMsgId());
+    public InMemorySubscriptionState(SubscriptionData subscriptionData, 
Version version) {
+        this(subscriptionData, version, 
subscriptionData.getState().getMsgId());
     }
 
     public SubscriptionData toSubscriptionData() {
@@ -72,6 +75,14 @@ public class InMemorySubscriptionState {
     public MessageSeqId getLastConsumeSeqId() {
         return lastConsumeSeqId;
     }
+     
+    public Version getVersion() {
+        return version;
+    }
+    
+    public void setVersion(Version version) {
+        this.version = version;
+    }
 
     /**
      *

Modified: 
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MMSubscriptionManager.java
URL: 
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MMSubscriptionManager.java?rev=1413745&r1=1413744&r2=1413745&view=diff
==============================================================================
--- 
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MMSubscriptionManager.java
 (original)
+++ 
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MMSubscriptionManager.java
 Mon Nov 26 17:47:54 2012
@@ -32,6 +32,8 @@ import org.apache.hedwig.server.meta.Sub
 import org.apache.hedwig.server.persistence.PersistenceManager;
 import org.apache.hedwig.server.topics.TopicManager;
 import org.apache.hedwig.util.Callback;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Versioned;
 
 /**
  * MetaManager-based subscription manager.
@@ -52,16 +54,17 @@ public class MMSubscriptionManager exten
     @Override
     protected void readSubscriptions(final ByteString topic,
                                      final Callback<Map<ByteString, 
InMemorySubscriptionState>> cb, final Object ctx) {
-        subManager.readSubscriptions(topic, new Callback<Map<ByteString, 
SubscriptionData>>() {
+        subManager.readSubscriptions(topic, new Callback<Map<ByteString, 
Versioned<SubscriptionData>>>() {
             @Override
             public void operationFailed(Object ctx, PubSubException pse) {
                 cb.operationFailed(ctx, pse);
             }
             @Override
-            public void operationFinished(Object ctx, Map<ByteString, 
SubscriptionData> subs) {
+            public void operationFinished(Object ctx, Map<ByteString, 
Versioned<SubscriptionData>> subs) {
                 Map<ByteString, InMemorySubscriptionState> results = new 
ConcurrentHashMap<ByteString, InMemorySubscriptionState>();
-                for (Map.Entry<ByteString, SubscriptionData> subEntry : 
subs.entrySet()) {
-                    results.put(subEntry.getKey(), new 
InMemorySubscriptionState(subEntry.getValue()));
+                for (Map.Entry<ByteString, Versioned<SubscriptionData>> 
subEntry : subs.entrySet()) {
+                    Versioned<SubscriptionData> vv = subEntry.getValue();
+                    results.put(subEntry.getKey(), new 
InMemorySubscriptionState(vv.getValue(), vv.getVersion()));
                 }
                 cb.operationFinished(ctx, results);
             }
@@ -69,32 +72,54 @@ public class MMSubscriptionManager exten
     }
 
     @Override
+    protected void readSubscriptionData(final ByteString topic, final 
ByteString subscriberId,
+                                        final 
Callback<InMemorySubscriptionState> cb, final Object ctx) {
+        subManager.readSubscriptionData(topic, subscriberId, new 
Callback<Versioned<SubscriptionData>>() {
+            @Override
+            public void operationFinished(Object ctx,
+                    Versioned<SubscriptionData> subData) {
+                if (null != subData) {
+                    cb.operationFinished(ctx, 
+                            new InMemorySubscriptionState(subData.getValue(), 
subData.getVersion()));
+                } else {
+                    cb.operationFinished(ctx, new InMemorySubscriptionState(
+                            SubscriptionData.getDefaultInstance(), 
Version.NEW));
+                }
+            }
+            @Override
+            public void operationFailed(Object ctx, PubSubException exception) 
{
+                cb.operationFailed(ctx, exception);
+            }
+        }, ctx);
+    }
+
+    @Override
     protected boolean isPartialUpdateSupported() {
         return subManager.isPartialUpdateSupported();
     }
 
     @Override
     protected void createSubscriptionData(final ByteString topic, final 
ByteString subscriberId,
-                                          final SubscriptionData subData, 
final Callback<Void> callback, final Object ctx) {
+                                          final SubscriptionData subData, 
final Callback<Version> callback, final Object ctx) {
         subManager.createSubscriptionData(topic, subscriberId, subData, 
callback, ctx);
     }
 
     @Override
-    protected void replaceSubscriptionData(final ByteString topic, final 
ByteString subscriberId,
-                                           final SubscriptionData subData, 
final Callback<Void> callback, final Object ctx) {
-        subManager.replaceSubscriptionData(topic, subscriberId, subData, 
callback, ctx);
+    protected void replaceSubscriptionData(final ByteString topic, final 
ByteString subscriberId, final SubscriptionData subData, 
+                                           final Version version, final 
Callback<Version> callback, final Object ctx) {
+        subManager.replaceSubscriptionData(topic, subscriberId, subData, 
version, callback, ctx);
     }
 
     @Override
-    protected void updateSubscriptionData(final ByteString topic, final 
ByteString subscriberId,
-                                          final SubscriptionData subData, 
final Callback<Void> callback, final Object ctx) {
-        subManager.updateSubscriptionData(topic, subscriberId, subData, 
callback, ctx);
+    protected void updateSubscriptionData(final ByteString topic, final 
ByteString subscriberId, final SubscriptionData subData, 
+                                          final Version version, final 
Callback<Version> callback, final Object ctx) {
+        subManager.updateSubscriptionData(topic, subscriberId, subData, 
version, callback, ctx);
     }
 
     @Override
-    protected void deleteSubscriptionData(final ByteString topic, final 
ByteString subscriberId,
+    protected void deleteSubscriptionData(final ByteString topic, final 
ByteString subscriberId, Version version,
                                           final Callback<Void> callback, final 
Object ctx) {
-        subManager.deleteSubscriptionData(topic, subscriberId, callback, ctx);
+        subManager.deleteSubscriptionData(topic, subscriberId, version, 
callback, ctx);
     }
 
     @Override

Modified: 
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManager.java
URL: 
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManager.java?rev=1413745&r1=1413744&r2=1413745&view=diff
==============================================================================
--- 
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManager.java
 (original)
+++ 
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManager.java
 Mon Nov 26 17:47:54 2012
@@ -21,7 +21,6 @@
 package org.apache.hedwig.server.meta;
 
 import java.util.Map;
-import java.util.concurrent.SynchronousQueue;
 
 import com.google.protobuf.ByteString;
 
@@ -35,7 +34,7 @@ import org.apache.hedwig.protocol.PubSub
 import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState;
 import org.apache.hedwig.server.topics.HubInfo;
-import org.apache.hedwig.server.meta.MetadataManagerFactory;
+import org.apache.hedwig.util.Callback;
 import org.apache.hedwig.util.Either;
 import org.apache.hedwig.util.HedwigSocketAddress;
 
@@ -234,35 +233,46 @@ public class TestMetadataManager extends
         ByteString topic = ByteString.copyFromUtf8("testSubscriptionData");
         ByteString subid = ByteString.copyFromUtf8("mysub");
 
-        StubCallback<Void> callback = new StubCallback<Void>();
-        StubCallback<SubscriptionData> readCallback = new 
StubCallback<SubscriptionData>();
-        StubCallback<Map<ByteString, SubscriptionData>> subsCallback
-            = new StubCallback<Map<ByteString, SubscriptionData>>();
+        final StubCallback<Version> callback = new StubCallback<Version>();
+        StubCallback<Versioned<SubscriptionData>> readCallback = new 
StubCallback<Versioned<SubscriptionData>>();
+        StubCallback<Map<ByteString, Versioned<SubscriptionData>>> subsCallback
+            = new StubCallback<Map<ByteString, Versioned<SubscriptionData>>>();
 
         subManager.readSubscriptionData(topic, subid, readCallback, null);
-        Either<SubscriptionData, PubSubException> readRes = 
readCallback.queue.take();
+        Either<Versioned<SubscriptionData>, PubSubException> readRes = 
readCallback.queue.take();
         Assert.assertEquals("Found inconsistent subscription state", null, 
readRes.left());
         Assert.assertEquals("Should not fail with PubSubException", null, 
readRes.right());
 
         // read non-existed subscription state
         subManager.readSubscriptions(topic, subsCallback, null);
-        Either<Map<ByteString, SubscriptionData>, PubSubException> res = 
subsCallback.queue.take();
+        Either<Map<ByteString, Versioned<SubscriptionData>>, PubSubException> 
res = subsCallback.queue.take();
         Assert.assertEquals("Found more than 0 subscribers", 0, 
res.left().size());
         Assert.assertEquals("Should not fail with PubSubException", null, 
res.right());
 
         // update non-existed subscription state
         if (subManager.isPartialUpdateSupported()) {
-            subManager.updateSubscriptionData(topic, subid, 
SubscriptionData.getDefaultInstance(),
-                                              callback, null);
+            subManager.updateSubscriptionData(topic, subid, 
+                    SubscriptionData.getDefaultInstance(), Version.ANY, 
callback, null);
         } else {
-            subManager.replaceSubscriptionData(topic, subid, 
SubscriptionData.getDefaultInstance(),
-                                               callback, null);
+            subManager.replaceSubscriptionData(topic, subid, 
+                    SubscriptionData.getDefaultInstance(), Version.ANY, 
callback, null);
         }
         Assert.assertTrue("Should fail to update a non-existed subscriber with 
PubSubException",
                           callback.queue.take().right() instanceof 
PubSubException.NoSubscriptionStateException);
 
+        Callback<Void> voidCallback = new Callback<Void>() {
+            @Override
+            public void operationFinished(Object ctx, Void resultOfOperation) {
+                callback.operationFinished(ctx, null);
+            }
+            @Override
+            public void operationFailed(Object ctx, PubSubException exception) 
{
+                callback.operationFailed(ctx, exception);
+            }
+        }; 
+        
         // delete non-existed subscription state
-        subManager.deleteSubscriptionData(topic, subid, callback, null);
+        subManager.deleteSubscriptionData(topic, subid, Version.ANY, 
voidCallback, null);
         Assert.assertTrue("Should fail to delete a non-existed subscriber with 
PubSubException",
                           callback.queue.take().right() instanceof 
PubSubException.NoSubscriptionStateException);
 
@@ -276,15 +286,19 @@ public class TestMetadataManager extends
 
         // create a subscription state
         subManager.createSubscriptionData(topic, subid, data, callback, null);
+        Either<Version, PubSubException> cbResult = callback.queue.take();
+        Version v1 = cbResult.left();
         Assert.assertEquals("Should not fail with PubSubException",
-                            null, callback.queue.take().right());
+                            null, cbResult.right());
 
         // read subscriptions
         subManager.readSubscriptions(topic, subsCallback, null);
         res = subsCallback.queue.take();
         Assert.assertEquals("Should find just 1 subscriber", 1, 
res.left().size());
         Assert.assertEquals("Should not fail with PubSubException", null, 
res.right());
-        SubscriptionData imss = res.left().get(subid);
+        Versioned<SubscriptionData> versionedSubData = res.left().get(subid);
+        Assert.assertEquals(Version.Occurred.CONCURRENTLY, 
v1.compare(versionedSubData.getVersion()));
+        SubscriptionData imss = versionedSubData.getValue();
         Assert.assertEquals("Found inconsistent subscription state",
                             data, imss);
         Assert.assertEquals("Found inconsistent last consumed seq id",
@@ -298,32 +312,46 @@ public class TestMetadataManager extends
 
         stateBuilder = 
SubscriptionState.newBuilder(data.getState()).setMsgId(msgId);
         data = SubscriptionData.newBuilder().setState(stateBuilder).build();
-
+        
         // update subscription state
         if (subManager.isPartialUpdateSupported()) {
-            subManager.updateSubscriptionData(topic, subid, data, callback, 
null);
+            subManager.updateSubscriptionData(topic, subid, data, 
versionedSubData.getVersion(), callback, null);
         } else {
-            subManager.replaceSubscriptionData(topic, subid, data, callback, 
null);
+            subManager.replaceSubscriptionData(topic, subid, data, 
versionedSubData.getVersion(), callback, null);
         }
-        Assert.assertEquals("Fail to update a subscription state", null, 
callback.queue.take().right());
-
+        cbResult = callback.queue.take();
+        Assert.assertEquals("Fail to update a subscription state", null, 
cbResult.right());
+        Version v2 = cbResult.left();
         // read subscription state
         subManager.readSubscriptionData(topic, subid, readCallback, null);
         Assert.assertEquals("Found inconsistent subscription state",
-                            data, readCallback.queue.take().left());
-
+                            data, readCallback.queue.take().left().getValue());
+        
         // read subscriptions again
         subManager.readSubscriptions(topic, subsCallback, null);
         res = subsCallback.queue.take();
         Assert.assertEquals("Should find just 1 subscriber", 1, 
res.left().size());
         Assert.assertEquals("Should not fail with PubSubException", null, 
res.right());
-        imss = res.left().get(subid);
+        versionedSubData = res.left().get(subid);
+        Assert.assertEquals(Version.Occurred.CONCURRENTLY, 
v2.compare(versionedSubData.getVersion()));
+        imss = res.left().get(subid).getValue();
         Assert.assertEquals("Found inconsistent subscription state",
                             data, imss);
         Assert.assertEquals("Found inconsistent last consumed seq id",
                             seqId, 
imss.getState().getMsgId().getLocalComponent());
 
-        subManager.deleteSubscriptionData(topic, subid, callback, null);
+        // update or replace subscription data with bad version
+        if (subManager.isPartialUpdateSupported()) {
+            subManager.updateSubscriptionData(topic, subid, data, v1, 
callback, null);
+        } else {
+            subManager.replaceSubscriptionData(topic, subid, data, v1, 
callback, null);
+        }
+        Assert.assertTrue(callback.queue.take().right() instanceof 
PubSubException.BadVersionException);
+        
+        // delete with bad version
+        subManager.deleteSubscriptionData(topic, subid, v1, voidCallback, 
null);
+        Assert.assertTrue(callback.queue.take().right() instanceof 
PubSubException.BadVersionException);
+        subManager.deleteSubscriptionData(topic, subid, 
res.left().get(subid).getVersion(), voidCallback, null);
         Assert.assertEquals("Fail to delete an existed subscriber", null, 
callback.queue.take().right());
 
         // read subscription states again


Reply via email to