Author: ivank
Date: Mon Nov 26 17:47:54 2012
New Revision: 1413745
URL: http://svn.apache.org/viewvc?rev=1413745&view=rev
Log:
BOOKKEEPER-440: Make Write/Delete SubscriptionData Restricted to Version
(Fangmin Lv via ivank)
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/SubscriptionDataManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/ZkMetadataManagerFactory.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MMSubscriptionManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManager.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1413745&r1=1413744&r2=1413745&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Mon Nov 26 17:47:54 2012
@@ -146,6 +146,8 @@ Trunk (unreleased changes)
BOOKKEEPER-439: No more messages delivered after deleted consumed
ledgers. (sijie via ivank)
+ BOOKKEEPER-440: Make Write/Delete SubscriptionData Restricted to
Version (Fangmin Lv via ivank)
+
IMPROVEMENTS:
BOOKKEEPER-467: Allocate ports for testing dynamically (ivank)
Modified:
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java
URL:
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java?rev=1413745&r1=1413744&r2=1413745&view=diff
==============================================================================
---
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java
(original)
+++
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/admin/HedwigAdmin.java
Mon Nov 26 17:47:54 2012
@@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -44,6 +45,7 @@ import org.apache.hedwig.server.meta.Met
import org.apache.hedwig.server.meta.SubscriptionDataManager;
import org.apache.hedwig.server.meta.TopicOwnershipManager;
import org.apache.hedwig.server.meta.TopicPersistenceManager;
+import org.apache.hedwig.server.subscriptions.InMemorySubscriptionState;
import org.apache.hedwig.server.topics.HubInfo;
import org.apache.hedwig.server.topics.HubLoad;
import org.apache.hedwig.util.Callback;
@@ -457,10 +459,16 @@ public class HedwigAdmin {
final SyncObj<Map<ByteString, SubscriptionData>> syncObj =
new SyncObj<Map<ByteString, SubscriptionData>>();
- sdm.readSubscriptions(topic, new Callback<Map<ByteString,
SubscriptionData>>() {
+ sdm.readSubscriptions(topic, new Callback<Map<ByteString,
Versioned<SubscriptionData>>>() {
@Override
- public void operationFinished(Object ctx, Map<ByteString,
SubscriptionData> result) {
- syncObj.success(result);
+ public void operationFinished(Object ctx, Map<ByteString,
Versioned<SubscriptionData>> result) {
+ // It was just used to console tool to print some information,
so don't need to return version for it
+ // just keep the getTopicSubscriptions interface as before
+ Map<ByteString, SubscriptionData> subs = new
ConcurrentHashMap<ByteString, SubscriptionData>();
+ for (Map.Entry<ByteString, Versioned<SubscriptionData>>
subEntry : result.entrySet()) {
+ subs.put(subEntry.getKey(),
subEntry.getValue().getValue());
+ }
+ syncObj.success(subs);
}
@Override
public void operationFailed(Object ctx, PubSubException pse) {
@@ -489,10 +497,14 @@ public class HedwigAdmin {
*/
public SubscriptionData getSubscription(ByteString topic, ByteString
subscriber) throws Exception {
final SyncObj<SubscriptionData> syncObj = new
SyncObj<SubscriptionData>();
- sdm.readSubscriptionData(topic, subscriber, new
Callback<SubscriptionData>() {
+ sdm.readSubscriptionData(topic, subscriber, new
Callback<Versioned<SubscriptionData>>() {
@Override
- public void operationFinished(Object ctx, SubscriptionData result)
{
- syncObj.success(result);
+ public void operationFinished(Object ctx,
Versioned<SubscriptionData> result) {
+ if (null == result) {
+ syncObj.success(null);
+ } else {
+ syncObj.success(result.getValue());
+ }
}
@Override
public void operationFailed(Object ctx, PubSubException pse) {
Modified:
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/SubscriptionDataManager.java
URL:
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/SubscriptionDataManager.java?rev=1413745&r1=1413744&r2=1413745&view=diff
==============================================================================
---
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/SubscriptionDataManager.java
(original)
+++
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/SubscriptionDataManager.java
Mon Nov 26 17:47:54 2012
@@ -22,6 +22,8 @@ import java.util.Map;
import com.google.protobuf.ByteString;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Versioned;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
import org.apache.hedwig.server.subscriptions.InMemorySubscriptionState;
@@ -42,14 +44,14 @@ public interface SubscriptionDataManager
* @param data
* Subscription data
* @param callback
- * Callback when subscription state created.
+ * Callback when subscription state created. New version would be
returned.
* {@link PubSubException.SubscriptionStateExistsException} is
returned when subscription state
* existed before.
* @param ctx
* Context of the callback
*/
public void createSubscriptionData(ByteString topic, ByteString
subscriberId, SubscriptionData data,
- Callback<Void> callback, Object ctx);
+ Callback<Version> callback, Object ctx);
/**
* Whether the metadata manager supports partial update.
@@ -71,15 +73,18 @@ public interface SubscriptionDataManager
* the part of data to update. The implementation should not
replace
* existing subscription data with <i>dataToUpdate</i> directly.
* E.g. if there is only state in it, you should update state
only.
+ * @param version
+ * Current version of subscription data.
* @param callback
- * Callback when subscription state updated.
+ * Callback when subscription state updated. New version would be
returned.
+ * {@link PubSubException.BadVersionException} is returned when
version doesn't match,
* {@link PubSubException.NoSubscriptionStateException} is
returned when no subscription state
* is found.
* @param ctx
* Context of the callback
*/
- public void updateSubscriptionData(ByteString topic, ByteString
subscriberId, SubscriptionData dataToUpdate,
- Callback<Void> callback, Object ctx);
+ public void updateSubscriptionData(ByteString topic, ByteString
subscriberId, SubscriptionData dataToUpdate,
+ Version version, Callback<Version>
callback, Object ctx);
/**
* Replace subscription data.
@@ -90,13 +95,18 @@ public interface SubscriptionDataManager
* Subscriber id
* @param dataToReplace
* Subscription data to replace.
+ * @param version
+ * Current version of subscription data.
* @param callback
- * Callback when subscription state updated.
+ * Callback when subscription state updated. New version would be
returned.
+ * {@link PubSubException.BadVersionException} is returned when
version doesn't match,
+ * {@link PubSubException.NoSubscriptionStateException} is
returned when no subscription state
+ * is found.
* @param ctx
* Context of the callback
*/
public void replaceSubscriptionData(ByteString topic, ByteString
subscriberId, SubscriptionData dataToReplace,
- Callback<Void> callback, Object ctx);
+ Version version, Callback<Version>
callback, Object ctx);
/**
* Remove subscription data.
@@ -105,18 +115,21 @@ public interface SubscriptionDataManager
* Topic name
* @param subscriberId
* Subscriber id
+ * @param version
+ * Current version of subscription data.
* @param callback
* Callback when subscription state deleted
+ * {@link PubSubException.BadVersionException} is returned when
version doesn't match,
* {@link PubSubException.NoSubscriptionStateException} is
returned when no subscription state
* is found.
* @param ctx
* Context of the callback
*/
- public void deleteSubscriptionData(ByteString topic, ByteString
subscriberId,
+ public void deleteSubscriptionData(ByteString topic, ByteString
subscriberId, Version version,
Callback<Void> callback, Object ctx);
/**
- * Read subscription data.
+ * Read subscription data with version.
*
* @param topic
* Topic Name
@@ -129,7 +142,7 @@ public interface SubscriptionDataManager
* Context of the callback
*/
public void readSubscriptionData(ByteString topic, ByteString subscriberId,
- Callback<SubscriptionData> callback,
Object ctx);
+ Callback<Versioned<SubscriptionData>>
callback, Object ctx);
/**
* Read all subscriptions of a topic.
@@ -137,10 +150,10 @@ public interface SubscriptionDataManager
* @param topic
* Topic name
* @param callback
- * Callback to return subscriptions
+ * Callback to return subscriptions with version information
* @param ctx
* Contxt of the callback
*/
- public void readSubscriptions(ByteString topic, Callback<Map<ByteString,
SubscriptionData>> cb,
+ public void readSubscriptions(ByteString topic, Callback<Map<ByteString,
Versioned<SubscriptionData>>> cb,
Object ctx);
}
Modified:
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/ZkMetadataManagerFactory.java
URL:
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/ZkMetadataManagerFactory.java?rev=1413745&r1=1413744&r2=1413745&view=diff
==============================================================================
---
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/ZkMetadataManagerFactory.java
(original)
+++
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/meta/ZkMetadataManagerFactory.java
Mon Nov 26 17:47:54 2012
@@ -37,6 +37,7 @@ import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
+import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.meta.ZkVersion;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges;
@@ -383,7 +384,7 @@ public class ZkMetadataManagerFactory ex
@Override
public void createSubscriptionData(final ByteString topic, final
ByteString subscriberId, final SubscriptionData data,
- final Callback<Void> callback,
final Object ctx) {
+ final Callback<Version> callback,
final Object ctx) {
ZkUtils.createFullPathOptimistic(zk, topicSubscriberPath(topic,
subscriberId), data.toByteArray(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new
SafeAsyncZKCallback.StringCallback() {
@@ -401,7 +402,7 @@ public class ZkMetadataManagerFactory ex
+ " subscriberId: " +
subscriberId.toStringUtf8() + " data: "
+
SubscriptionStateUtils.toString(data));
}
- callback.operationFinished(ctx, null);
+ callback.operationFinished(ctx, new ZkVersion(0));
} else {
KeeperException ke =
ZkUtils.logErrorAndCreateZKException(
"Could not record new
subscription for topic: " + topic.toStringUtf8()
@@ -414,15 +415,30 @@ public class ZkMetadataManagerFactory ex
@Override
public void updateSubscriptionData(final ByteString topic, final
ByteString subscriberId, final SubscriptionData data,
- final Callback<Void> callback,
final Object ctx) {
+ final Version version, final
Callback<Version> callback, final Object ctx) {
throw new UnsupportedOperationException("ZooKeeper based metadata
manager doesn't support partial update!");
}
@Override
public void replaceSubscriptionData(final ByteString topic, final
ByteString subscriberId, final SubscriptionData data,
- final Callback<Void> callback,
final Object ctx) {
- zk.setData(topicSubscriberPath(topic, subscriberId),
data.toByteArray(), -1,
- new SafeAsyncZKCallback.StatCallback() {
+ final Version version, final
Callback<Version> callback, final Object ctx) {
+ int znodeVersion = -1;
+ if (Version.NEW == version) {
+ callback.operationFailed(ctx,
+ new PubSubException.BadVersionException("Can not
replace Version.New subscription data"));
+ return;
+ } else if (Version.ANY != version) {
+ if (!(version instanceof ZkVersion)) {
+ callback.operationFailed(ctx, new
PubSubException.UnexpectedConditionException(
+ "Invalid version provided to
replace subscription data for topic "
+ + topic.toStringUtf8() + "
subscribe id: " + subscriberId));
+ return;
+ } else {
+ znodeVersion = ((ZkVersion)version).getZnodeVersion();
+ }
+ }
+ zk.setData(topicSubscriberPath(topic, subscriberId),
data.toByteArray(),
+ znodeVersion, new SafeAsyncZKCallback.StatCallback() {
@Override
public void safeProcessResult(int rc, String path, Object ctx,
Stat stat) {
if (rc == Code.NONODE.intValue()) {
@@ -431,6 +447,12 @@ public class ZkMetadataManagerFactory ex
"No subscription state
found for (topic:" + topic.toStringUtf8() + ", subscriber:"
+
subscriberId.toStringUtf8() + ")."));
return;
+ } else if (rc == Code.BadVersion) {
+ // bad version
+ callback.operationFailed(ctx,
PubSubException.create(StatusCode.BAD_VERSION,
+ "Bad version provided to
replace subscription data of topic "
+ + topic.toStringUtf8() +
" subscriberId " + subscriberId));
+ return;
} else if (rc != Code.OK.intValue()) {
KeeperException e =
ZkUtils.logErrorAndCreateZKException("Topic: " + topic.toStringUtf8()
+ " subscriberId: " +
subscriberId.toStringUtf8()
@@ -444,16 +466,33 @@ public class ZkMetadataManagerFactory ex
+
SubscriptionStateUtils.toString(data));
}
- callback.operationFinished(ctx, null);
+ callback.operationFinished(ctx, new
ZkVersion(stat.getVersion()));
}
}
}, ctx);
}
@Override
- public void deleteSubscriptionData(final ByteString topic, final
ByteString subscriberId,
+ public void deleteSubscriptionData(final ByteString topic, final
ByteString subscriberId, Version version,
final Callback<Void> callback,
Object ctx) {
- zk.delete(topicSubscriberPath(topic, subscriberId), -1, new
SafeAsyncZKCallback.VoidCallback() {
+
+ int znodeVersion = -1;
+ if (Version.NEW == version) {
+ callback.operationFailed(ctx,
+ new PubSubException.BadVersionException("Can not
delete Version.New subscription data"));
+ return;
+ } else if (Version.ANY != version) {
+ if (!(version instanceof ZkVersion)) {
+ callback.operationFailed(ctx, new
PubSubException.UnexpectedConditionException(
+ "Invalid version provided to
delete subscription data for topic "
+ + topic.toStringUtf8() + "
subscribe id: " + subscriberId));
+ return;
+ } else {
+ znodeVersion = ((ZkVersion)version).getZnodeVersion();
+ }
+ }
+
+ zk.delete(topicSubscriberPath(topic, subscriberId), znodeVersion,
new SafeAsyncZKCallback.VoidCallback() {
@Override
public void safeProcessResult(int rc, String path, Object ctx)
{
if (rc == Code.NONODE.intValue()) {
@@ -462,6 +501,12 @@ public class ZkMetadataManagerFactory ex
"No subscription state
found for (topic:" + topic.toStringUtf8() + ", subscriber:"
+
subscriberId.toStringUtf8() + ")."));
return;
+ } else if (rc == Code.BadVersion) {
+ // bad version
+ callback.operationFailed(ctx,
PubSubException.create(StatusCode.BAD_VERSION,
+ "Bad version provided to
delete subscription data of topic "
+ + topic.toStringUtf8() +
" subscriberId " + subscriberId));
+ return;
} else if (rc == Code.OK.intValue()) {
if (logger.isDebugEnabled()) {
logger.debug("Successfully deleted subscription
for topic: " + topic.toStringUtf8()
@@ -481,7 +526,7 @@ public class ZkMetadataManagerFactory ex
@Override
public void readSubscriptionData(final ByteString topic, final
ByteString subscriberId,
- final Callback<SubscriptionData>
callback, final Object ctx) {
+ final
Callback<Versioned<SubscriptionData>> callback, final Object ctx) {
zk.getData(topicSubscriberPath(topic, subscriberId), false, new
SafeAsyncZKCallback.DataCallback() {
@Override
public void safeProcessResult(int rc, String path, Object ctx,
byte[] data, Stat stat) {
@@ -496,10 +541,12 @@ public class ZkMetadataManagerFactory ex
callback.operationFailed(ctx, new
PubSubException.ServiceDownException(e));
return;
}
-
- SubscriptionData subData;
+
+ Versioned<SubscriptionData> subData;
try {
- subData =
SubscriptionStateUtils.parseSubscriptionData(data);
+ subData = new Versioned<SubscriptionData>(
+
SubscriptionStateUtils.parseSubscriptionData(data),
+ new ZkVersion(stat.getVersion()));
} catch (InvalidProtocolBufferException ex) {
String msg = "Failed to deserialize subscription data
for topic: " + topic.toStringUtf8()
+ " subscriberId: " +
subscriberId.toStringUtf8();
@@ -511,7 +558,7 @@ public class ZkMetadataManagerFactory ex
if (logger.isDebugEnabled()) {
logger.debug("Found subscription while acquiring
topic: " + topic.toStringUtf8()
+ " subscriberId: " +
subscriberId.toStringUtf8()
- + " data: " +
SubscriptionStateUtils.toString(subData));
+ + " data: " +
SubscriptionStateUtils.toString(subData.getValue()));
}
callback.operationFinished(ctx, subData);
}
@@ -520,7 +567,7 @@ public class ZkMetadataManagerFactory ex
@Override
public void readSubscriptions(final ByteString topic,
- final Callback<Map<ByteString,
SubscriptionData>> cb, final Object ctx) {
+ final Callback<Map<ByteString,
Versioned<SubscriptionData>>> cb, final Object ctx) {
String topicSubscribersPath = topicSubscribersPath(new
StringBuilder(), topic).toString();
zk.getChildren(topicSubscribersPath, false, new
SafeAsyncZKCallback.ChildrenCallback() {
@Override
@@ -533,7 +580,8 @@ public class ZkMetadataManagerFactory ex
return;
}
- final Map<ByteString, SubscriptionData> topicSubs = new
ConcurrentHashMap<ByteString, SubscriptionData>();
+ final Map<ByteString, Versioned<SubscriptionData>>
topicSubs =
+ new ConcurrentHashMap<ByteString,
Versioned<SubscriptionData>>();
if (rc == Code.NONODE.intValue() || children.size() == 0) {
if (logger.isDebugEnabled()) {
@@ -567,9 +615,11 @@ public class ZkMetadataManagerFactory ex
return;
}
- SubscriptionData subData;
+ Versioned<SubscriptionData> subData;
try {
- subData =
SubscriptionStateUtils.parseSubscriptionData(data);
+ subData = new Versioned<SubscriptionData>(
+
SubscriptionStateUtils.parseSubscriptionData(data),
+ new ZkVersion(stat.getVersion()));
} catch (InvalidProtocolBufferException ex) {
String msg = "Failed to deserialize
subscription data for topic: " + topic.toStringUtf8()
+ " subscriberId: " +
subscriberId.toStringUtf8();
@@ -581,7 +631,7 @@ public class ZkMetadataManagerFactory ex
if (logger.isDebugEnabled()) {
logger.debug("Found subscription while
acquiring topic: " + topic.toStringUtf8()
+ " subscriberId: " + child +
"state: "
- +
SubscriptionStateUtils.toString(subData));
+ +
SubscriptionStateUtils.toString(subData.getValue()));
}
topicSubs.put(subscriberId, subData);
Modified:
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
URL:
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java?rev=1413745&r1=1413744&r2=1413745&view=diff
==============================================================================
---
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
(original)
+++
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
Mon Nov 26 17:47:54 2012
@@ -31,6 +31,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.protobuf.ByteString;
+
+import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Versioned;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
@@ -323,7 +327,10 @@ public abstract class AbstractSubscripti
protected abstract void readSubscriptions(final ByteString topic,
final Callback<Map<ByteString, InMemorySubscriptionState>> cb,
final Object ctx);
-
+
+ protected abstract void readSubscriptionData(final ByteString topic, final
ByteString subscriberId,
+ final Callback<InMemorySubscriptionState> cb, Object ctx);
+
private class SubscribeOp extends
TopicOpQueuer.AsynchronousOp<SubscriptionData> {
SubscribeRequest subRequest;
MessageSeqId consumeSeqId;
@@ -428,21 +435,21 @@ public abstract class AbstractSubscripti
SubscriptionData.newBuilder().setState(stateBuilder).setPreferences(preferencesBuilder);
final SubscriptionData subData = subDataBuilder.build();
- createSubscriptionData(topic, subscriberId, subData, new
Callback<Void>() {
+ createSubscriptionData(topic, subscriberId, subData, new
Callback<Version>() {
@Override
public void operationFailed(Object ctx, PubSubException
exception) {
cb.operationFailed(ctx, exception);
}
@Override
- public void operationFinished(Object ctx, Void
resultOfOperation) {
+ public void operationFinished(Object ctx, final Version
version) {
Callback<Void> cb2 = new Callback<Void>() {
@Override
public void operationFailed(final Object ctx, final
PubSubException exception) {
logger.error("subscription for subscriber " +
subscriberId.toStringUtf8() + " to topic "
+ topic.toStringUtf8() + " failed due
to failed listener callback", exception);
// should remove subscription when synchronized
cross-region subscription failed
- deleteSubscriptionData(topic, subscriberId, new
Callback<Void>() {
+ deleteSubscriptionData(topic, subscriberId,
version, new Callback<Void>() {
@Override
public void operationFinished(Object context,
Void resultOfOperation) {
@@ -463,7 +470,7 @@ public abstract class AbstractSubscripti
@Override
public void operationFinished(Object ctx, Void
resultOfOperation) {
- topicSubscriptions.put(subscriberId, new
InMemorySubscriptionState(subData));
+ topicSubscriptions.put(subscriberId, new
InMemorySubscriptionState(subData, version));
updateMessageBound(topic);
@@ -477,7 +484,7 @@ public abstract class AbstractSubscripti
&& !hasLocalSubscriptions(topicSubscriptions))
notifyFirstLocalSubscribe(topic,
subRequest.getSynchronous(), cb2, ctx);
else
- cb2.operationFinished(ctx, resultOfOperation);
+ cb2.operationFinished(ctx, null);
}
}, ctx);
}
@@ -613,8 +620,9 @@ public abstract class AbstractSubscripti
cb.operationFailed(ctx, new
PubSubException.ClientNotSubscribedException(""));
return;
}
-
- deleteSubscriptionData(topic, subscriberId, new Callback<Void>() {
+
+ deleteSubscriptionData(topic, subscriberId,
topicSubscriptions.get(subscriberId).getVersion(),
+ new Callback<Void>() {
@Override
public void operationFailed(Object ctx, PubSubException
exception) {
cb.operationFailed(ctx, exception);
@@ -679,44 +687,100 @@ public abstract class AbstractSubscripti
}
}
- private void updateSubscriptionState(ByteString topic, ByteString
subscriberId,
- InMemorySubscriptionState state,
- Callback<Void> callback, Object ctx) {
+ private void updateSubscriptionState(final ByteString topic, final
ByteString subscriberId,
+ final InMemorySubscriptionState state,
+ final Callback<Void> callback, Object
ctx) {
SubscriptionData subData;
+ Callback<Version> cb = new Callback<Version>() {
+ @Override
+ public void operationFinished(Object ctx, Version version) {
+ state.setVersion(version);
+ callback.operationFinished(ctx, null);
+ }
+ @Override
+ public void operationFailed(Object ctx, PubSubException exception)
{
+ if (exception instanceof PubSubException.BadVersionException) {
+ readSubscriptionData(topic, subscriberId, new
Callback<InMemorySubscriptionState>() {
+ @Override
+ public void operationFinished(Object ctx,
+ InMemorySubscriptionState resultOfOperation) {
+ state.setVersion(resultOfOperation.getVersion());
+ updateSubscriptionState(topic, subscriberId,
state, callback, ctx);
+ }
+ @Override
+ public void operationFailed(Object ctx,
+ PubSubException exception) {
+ callback.operationFailed(ctx, exception);
+ }
+ }, ctx);
+
+ return;
+ }
+ callback.operationFailed(ctx, exception);
+ }
+ };
if (isPartialUpdateSupported()) {
subData =
SubscriptionData.newBuilder().setState(state.getSubscriptionState()).build();
- updateSubscriptionData(topic, subscriberId, subData, callback,
ctx);
+ updateSubscriptionData(topic, subscriberId, subData,
state.getVersion(), cb, ctx);
} else {
subData = state.toSubscriptionData();
- replaceSubscriptionData(topic, subscriberId, subData, callback,
ctx);
+ replaceSubscriptionData(topic, subscriberId, subData,
state.getVersion(), cb, ctx);
}
}
- private void updateSubscriptionPreferences(ByteString topic, ByteString
subscriberId,
- InMemorySubscriptionState state,
- Callback<Void> callback, Object
ctx) {
+ private void updateSubscriptionPreferences(final ByteString topic, final
ByteString subscriberId,
+ final InMemorySubscriptionState
state,
+ final Callback<Void> callback,
Object ctx) {
SubscriptionData subData;
+ Callback<Version> cb = new Callback<Version>() {
+ @Override
+ public void operationFinished(Object ctx, Version version) {
+ state.setVersion(version);
+ callback.operationFinished(ctx, null);
+ }
+ @Override
+ public void operationFailed(Object ctx, PubSubException exception)
{
+ if (exception instanceof PubSubException.BadVersionException) {
+ readSubscriptionData(topic, subscriberId, new
Callback<InMemorySubscriptionState>() {
+ @Override
+ public void operationFinished(Object ctx,
+ InMemorySubscriptionState resultOfOperation) {
+ state.setVersion(resultOfOperation.getVersion());
+ updateSubscriptionPreferences(topic, subscriberId,
state, callback, ctx);
+ }
+ @Override
+ public void operationFailed(Object ctx,
+ PubSubException exception) {
+ callback.operationFailed(ctx, exception);
+ }
+ }, ctx);
+
+ return;
+ }
+ callback.operationFailed(ctx, exception);
+ }
+ };
if (isPartialUpdateSupported()) {
subData =
SubscriptionData.newBuilder().setPreferences(state.getSubscriptionPreferences()).build();
- updateSubscriptionData(topic, subscriberId, subData, callback,
ctx);
+ updateSubscriptionData(topic, subscriberId, subData,
state.getVersion(), cb, ctx);
} else {
subData = state.toSubscriptionData();
- replaceSubscriptionData(topic, subscriberId, subData, callback,
ctx);
+ replaceSubscriptionData(topic, subscriberId, subData,
state.getVersion(), cb, ctx);
}
}
protected abstract boolean isPartialUpdateSupported();
protected abstract void createSubscriptionData(final ByteString topic,
ByteString subscriberId,
- SubscriptionData data, Callback<Void> callback, Object ctx);
+ SubscriptionData data, Callback<Version> callback, Object ctx);
protected abstract void updateSubscriptionData(ByteString topic,
ByteString subscriberId, SubscriptionData data,
- Callback<Void> callback, Object ctx);
+ Version version, Callback<Version> callback, Object ctx);
protected abstract void replaceSubscriptionData(ByteString topic,
ByteString subscriberId, SubscriptionData data,
- Callback<Void> callback, Object ctx);
+ Version version, Callback<Version> callback, Object ctx);
- protected abstract void deleteSubscriptionData(ByteString topic,
ByteString subscriberId, Callback<Void> callback,
+ protected abstract void deleteSubscriptionData(ByteString topic,
ByteString subscriberId, Version version, Callback<Void> callback,
Object ctx);
}
Modified:
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java
URL:
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java?rev=1413745&r1=1413744&r2=1413745&view=diff
==============================================================================
---
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java
(original)
+++
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java
Mon Nov 26 17:47:54 2012
@@ -22,6 +22,8 @@ import java.util.concurrent.ConcurrentHa
import java.util.concurrent.ScheduledExecutorService;
import com.google.protobuf.ByteString;
+
+import org.apache.bookkeeper.versioning.Version;
import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
import org.apache.hedwig.server.common.ServerConfiguration;
import org.apache.hedwig.server.delivery.DeliveryManager;
@@ -43,13 +45,13 @@ public class InMemorySubscriptionManager
@Override
protected void createSubscriptionData(ByteString topic, ByteString
subscriberId, SubscriptionData subData,
- Callback<Void> callback, Object
ctx) {
+ Callback<Version> callback, Object
ctx) {
// nothing to do, in-memory info is already recorded by base class
callback.operationFinished(ctx, null);
}
@Override
- protected void deleteSubscriptionData(ByteString topic, ByteString
subscriberId, Callback<Void> callback,
+ protected void deleteSubscriptionData(ByteString topic, ByteString
subscriberId, Version version, Callback<Void> callback,
Object ctx) {
// nothing to do, in-memory info is already deleted by base class
callback.operationFinished(ctx, null);
@@ -62,13 +64,13 @@ public class InMemorySubscriptionManager
@Override
protected void updateSubscriptionData(ByteString topic, ByteString
subscriberId, SubscriptionData data,
- Callback<Void> callback, Object ctx)
{
+ Version version, Callback<Version>
callback, Object ctx) {
throw new UnsupportedOperationException("Doesn't support partial
update");
}
@Override
protected void replaceSubscriptionData(ByteString topic, ByteString
subscriberId, SubscriptionData data,
- Callback<Void> callback, Object
ctx) {
+ Version version, Callback<Version>
callback, Object ctx) {
// nothing to do, in-memory info is already updated by base class
callback.operationFinished(ctx, null);
}
@@ -100,4 +102,18 @@ public class InMemorySubscriptionManager
}
+ @Override
+ protected void readSubscriptionData(ByteString topic,
+ ByteString subscriberId, Callback<InMemorySubscriptionState> cb,
Object ctx) {
+ // Since we backed up in-memory information on lostTopic, we can just
return that back
+ InMemorySubscriptionState subState =
top2sub2seqBackup.get(topic).remove(subscriberId);
+
+ if (subState != null) {
+ cb.operationFinished(ctx, subState);
+ } else {
+ cb.operationFinished(ctx, new InMemorySubscriptionState(
+ SubscriptionData.getDefaultInstance(), Version.NEW));
+ }
+ }
+
}
Modified:
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java
URL:
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java?rev=1413745&r1=1413744&r2=1413745&view=diff
==============================================================================
---
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java
(original)
+++
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java
Mon Nov 26 17:47:54 2012
@@ -22,6 +22,7 @@ import java.util.Map;
import com.google.protobuf.ByteString;
+import org.apache.bookkeeper.versioning.Version;
import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
@@ -33,8 +34,9 @@ public class InMemorySubscriptionState {
SubscriptionState subscriptionState;
SubscriptionPreferences subscriptionPreferences;
MessageSeqId lastConsumeSeqId;
+ Version version;
- public InMemorySubscriptionState(SubscriptionData subscriptionData,
MessageSeqId lastConsumeSeqId) {
+ public InMemorySubscriptionState(SubscriptionData subscriptionData,
Version version, MessageSeqId lastConsumeSeqId) {
this.subscriptionState = subscriptionData.getState();
if (subscriptionData.hasPreferences()) {
this.subscriptionPreferences = subscriptionData.getPreferences();
@@ -47,10 +49,11 @@ public class InMemorySubscriptionState {
}
this.lastConsumeSeqId = lastConsumeSeqId;
+ this.version = version;
}
- public InMemorySubscriptionState(SubscriptionData subscriptionData) {
- this(subscriptionData, subscriptionData.getState().getMsgId());
+ public InMemorySubscriptionState(SubscriptionData subscriptionData,
Version version) {
+ this(subscriptionData, version,
subscriptionData.getState().getMsgId());
}
public SubscriptionData toSubscriptionData() {
@@ -72,6 +75,14 @@ public class InMemorySubscriptionState {
public MessageSeqId getLastConsumeSeqId() {
return lastConsumeSeqId;
}
+
+ public Version getVersion() {
+ return version;
+ }
+
+ public void setVersion(Version version) {
+ this.version = version;
+ }
/**
*
Modified:
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MMSubscriptionManager.java
URL:
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MMSubscriptionManager.java?rev=1413745&r1=1413744&r2=1413745&view=diff
==============================================================================
---
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MMSubscriptionManager.java
(original)
+++
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MMSubscriptionManager.java
Mon Nov 26 17:47:54 2012
@@ -32,6 +32,8 @@ import org.apache.hedwig.server.meta.Sub
import org.apache.hedwig.server.persistence.PersistenceManager;
import org.apache.hedwig.server.topics.TopicManager;
import org.apache.hedwig.util.Callback;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Versioned;
/**
* MetaManager-based subscription manager.
@@ -52,16 +54,17 @@ public class MMSubscriptionManager exten
@Override
protected void readSubscriptions(final ByteString topic,
final Callback<Map<ByteString,
InMemorySubscriptionState>> cb, final Object ctx) {
- subManager.readSubscriptions(topic, new Callback<Map<ByteString,
SubscriptionData>>() {
+ subManager.readSubscriptions(topic, new Callback<Map<ByteString,
Versioned<SubscriptionData>>>() {
@Override
public void operationFailed(Object ctx, PubSubException pse) {
cb.operationFailed(ctx, pse);
}
@Override
- public void operationFinished(Object ctx, Map<ByteString,
SubscriptionData> subs) {
+ public void operationFinished(Object ctx, Map<ByteString,
Versioned<SubscriptionData>> subs) {
Map<ByteString, InMemorySubscriptionState> results = new
ConcurrentHashMap<ByteString, InMemorySubscriptionState>();
- for (Map.Entry<ByteString, SubscriptionData> subEntry :
subs.entrySet()) {
- results.put(subEntry.getKey(), new
InMemorySubscriptionState(subEntry.getValue()));
+ for (Map.Entry<ByteString, Versioned<SubscriptionData>>
subEntry : subs.entrySet()) {
+ Versioned<SubscriptionData> vv = subEntry.getValue();
+ results.put(subEntry.getKey(), new
InMemorySubscriptionState(vv.getValue(), vv.getVersion()));
}
cb.operationFinished(ctx, results);
}
@@ -69,32 +72,54 @@ public class MMSubscriptionManager exten
}
@Override
+ protected void readSubscriptionData(final ByteString topic, final
ByteString subscriberId,
+ final
Callback<InMemorySubscriptionState> cb, final Object ctx) {
+ subManager.readSubscriptionData(topic, subscriberId, new
Callback<Versioned<SubscriptionData>>() {
+ @Override
+ public void operationFinished(Object ctx,
+ Versioned<SubscriptionData> subData) {
+ if (null != subData) {
+ cb.operationFinished(ctx,
+ new InMemorySubscriptionState(subData.getValue(),
subData.getVersion()));
+ } else {
+ cb.operationFinished(ctx, new InMemorySubscriptionState(
+ SubscriptionData.getDefaultInstance(),
Version.NEW));
+ }
+ }
+ @Override
+ public void operationFailed(Object ctx, PubSubException exception)
{
+ cb.operationFailed(ctx, exception);
+ }
+ }, ctx);
+ }
+
+ @Override
protected boolean isPartialUpdateSupported() {
return subManager.isPartialUpdateSupported();
}
@Override
protected void createSubscriptionData(final ByteString topic, final
ByteString subscriberId,
- final SubscriptionData subData,
final Callback<Void> callback, final Object ctx) {
+ final SubscriptionData subData,
final Callback<Version> callback, final Object ctx) {
subManager.createSubscriptionData(topic, subscriberId, subData,
callback, ctx);
}
@Override
- protected void replaceSubscriptionData(final ByteString topic, final
ByteString subscriberId,
- final SubscriptionData subData,
final Callback<Void> callback, final Object ctx) {
- subManager.replaceSubscriptionData(topic, subscriberId, subData,
callback, ctx);
+ protected void replaceSubscriptionData(final ByteString topic, final
ByteString subscriberId, final SubscriptionData subData,
+ final Version version, final
Callback<Version> callback, final Object ctx) {
+ subManager.replaceSubscriptionData(topic, subscriberId, subData,
version, callback, ctx);
}
@Override
- protected void updateSubscriptionData(final ByteString topic, final
ByteString subscriberId,
- final SubscriptionData subData,
final Callback<Void> callback, final Object ctx) {
- subManager.updateSubscriptionData(topic, subscriberId, subData,
callback, ctx);
+ protected void updateSubscriptionData(final ByteString topic, final
ByteString subscriberId, final SubscriptionData subData,
+ final Version version, final
Callback<Version> callback, final Object ctx) {
+ subManager.updateSubscriptionData(topic, subscriberId, subData,
version, callback, ctx);
}
@Override
- protected void deleteSubscriptionData(final ByteString topic, final
ByteString subscriberId,
+ protected void deleteSubscriptionData(final ByteString topic, final
ByteString subscriberId, Version version,
final Callback<Void> callback, final
Object ctx) {
- subManager.deleteSubscriptionData(topic, subscriberId, callback, ctx);
+ subManager.deleteSubscriptionData(topic, subscriberId, version,
callback, ctx);
}
@Override
Modified:
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManager.java
URL:
http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManager.java?rev=1413745&r1=1413744&r2=1413745&view=diff
==============================================================================
---
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManager.java
(original)
+++
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/meta/TestMetadataManager.java
Mon Nov 26 17:47:54 2012
@@ -21,7 +21,6 @@
package org.apache.hedwig.server.meta;
import java.util.Map;
-import java.util.concurrent.SynchronousQueue;
import com.google.protobuf.ByteString;
@@ -35,7 +34,7 @@ import org.apache.hedwig.protocol.PubSub
import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState;
import org.apache.hedwig.server.topics.HubInfo;
-import org.apache.hedwig.server.meta.MetadataManagerFactory;
+import org.apache.hedwig.util.Callback;
import org.apache.hedwig.util.Either;
import org.apache.hedwig.util.HedwigSocketAddress;
@@ -234,35 +233,46 @@ public class TestMetadataManager extends
ByteString topic = ByteString.copyFromUtf8("testSubscriptionData");
ByteString subid = ByteString.copyFromUtf8("mysub");
- StubCallback<Void> callback = new StubCallback<Void>();
- StubCallback<SubscriptionData> readCallback = new
StubCallback<SubscriptionData>();
- StubCallback<Map<ByteString, SubscriptionData>> subsCallback
- = new StubCallback<Map<ByteString, SubscriptionData>>();
+ final StubCallback<Version> callback = new StubCallback<Version>();
+ StubCallback<Versioned<SubscriptionData>> readCallback = new
StubCallback<Versioned<SubscriptionData>>();
+ StubCallback<Map<ByteString, Versioned<SubscriptionData>>> subsCallback
+ = new StubCallback<Map<ByteString, Versioned<SubscriptionData>>>();
subManager.readSubscriptionData(topic, subid, readCallback, null);
- Either<SubscriptionData, PubSubException> readRes =
readCallback.queue.take();
+ Either<Versioned<SubscriptionData>, PubSubException> readRes =
readCallback.queue.take();
Assert.assertEquals("Found inconsistent subscription state", null,
readRes.left());
Assert.assertEquals("Should not fail with PubSubException", null,
readRes.right());
// read non-existed subscription state
subManager.readSubscriptions(topic, subsCallback, null);
- Either<Map<ByteString, SubscriptionData>, PubSubException> res =
subsCallback.queue.take();
+ Either<Map<ByteString, Versioned<SubscriptionData>>, PubSubException>
res = subsCallback.queue.take();
Assert.assertEquals("Found more than 0 subscribers", 0,
res.left().size());
Assert.assertEquals("Should not fail with PubSubException", null,
res.right());
// update non-existed subscription state
if (subManager.isPartialUpdateSupported()) {
- subManager.updateSubscriptionData(topic, subid,
SubscriptionData.getDefaultInstance(),
- callback, null);
+ subManager.updateSubscriptionData(topic, subid,
+ SubscriptionData.getDefaultInstance(), Version.ANY,
callback, null);
} else {
- subManager.replaceSubscriptionData(topic, subid,
SubscriptionData.getDefaultInstance(),
- callback, null);
+ subManager.replaceSubscriptionData(topic, subid,
+ SubscriptionData.getDefaultInstance(), Version.ANY,
callback, null);
}
Assert.assertTrue("Should fail to update a non-existed subscriber with
PubSubException",
callback.queue.take().right() instanceof
PubSubException.NoSubscriptionStateException);
+ Callback<Void> voidCallback = new Callback<Void>() {
+ @Override
+ public void operationFinished(Object ctx, Void resultOfOperation) {
+ callback.operationFinished(ctx, null);
+ }
+ @Override
+ public void operationFailed(Object ctx, PubSubException exception)
{
+ callback.operationFailed(ctx, exception);
+ }
+ };
+
// delete non-existed subscription state
- subManager.deleteSubscriptionData(topic, subid, callback, null);
+ subManager.deleteSubscriptionData(topic, subid, Version.ANY,
voidCallback, null);
Assert.assertTrue("Should fail to delete a non-existed subscriber with
PubSubException",
callback.queue.take().right() instanceof
PubSubException.NoSubscriptionStateException);
@@ -276,15 +286,19 @@ public class TestMetadataManager extends
// create a subscription state
subManager.createSubscriptionData(topic, subid, data, callback, null);
+ Either<Version, PubSubException> cbResult = callback.queue.take();
+ Version v1 = cbResult.left();
Assert.assertEquals("Should not fail with PubSubException",
- null, callback.queue.take().right());
+ null, cbResult.right());
// read subscriptions
subManager.readSubscriptions(topic, subsCallback, null);
res = subsCallback.queue.take();
Assert.assertEquals("Should find just 1 subscriber", 1,
res.left().size());
Assert.assertEquals("Should not fail with PubSubException", null,
res.right());
- SubscriptionData imss = res.left().get(subid);
+ Versioned<SubscriptionData> versionedSubData = res.left().get(subid);
+ Assert.assertEquals(Version.Occurred.CONCURRENTLY,
v1.compare(versionedSubData.getVersion()));
+ SubscriptionData imss = versionedSubData.getValue();
Assert.assertEquals("Found inconsistent subscription state",
data, imss);
Assert.assertEquals("Found inconsistent last consumed seq id",
@@ -298,32 +312,46 @@ public class TestMetadataManager extends
stateBuilder =
SubscriptionState.newBuilder(data.getState()).setMsgId(msgId);
data = SubscriptionData.newBuilder().setState(stateBuilder).build();
-
+
// update subscription state
if (subManager.isPartialUpdateSupported()) {
- subManager.updateSubscriptionData(topic, subid, data, callback,
null);
+ subManager.updateSubscriptionData(topic, subid, data,
versionedSubData.getVersion(), callback, null);
} else {
- subManager.replaceSubscriptionData(topic, subid, data, callback,
null);
+ subManager.replaceSubscriptionData(topic, subid, data,
versionedSubData.getVersion(), callback, null);
}
- Assert.assertEquals("Fail to update a subscription state", null,
callback.queue.take().right());
-
+ cbResult = callback.queue.take();
+ Assert.assertEquals("Fail to update a subscription state", null,
cbResult.right());
+ Version v2 = cbResult.left();
// read subscription state
subManager.readSubscriptionData(topic, subid, readCallback, null);
Assert.assertEquals("Found inconsistent subscription state",
- data, readCallback.queue.take().left());
-
+ data, readCallback.queue.take().left().getValue());
+
// read subscriptions again
subManager.readSubscriptions(topic, subsCallback, null);
res = subsCallback.queue.take();
Assert.assertEquals("Should find just 1 subscriber", 1,
res.left().size());
Assert.assertEquals("Should not fail with PubSubException", null,
res.right());
- imss = res.left().get(subid);
+ versionedSubData = res.left().get(subid);
+ Assert.assertEquals(Version.Occurred.CONCURRENTLY,
v2.compare(versionedSubData.getVersion()));
+ imss = res.left().get(subid).getValue();
Assert.assertEquals("Found inconsistent subscription state",
data, imss);
Assert.assertEquals("Found inconsistent last consumed seq id",
seqId,
imss.getState().getMsgId().getLocalComponent());
- subManager.deleteSubscriptionData(topic, subid, callback, null);
+ // update or replace subscription data with bad version
+ if (subManager.isPartialUpdateSupported()) {
+ subManager.updateSubscriptionData(topic, subid, data, v1,
callback, null);
+ } else {
+ subManager.replaceSubscriptionData(topic, subid, data, v1,
callback, null);
+ }
+ Assert.assertTrue(callback.queue.take().right() instanceof
PubSubException.BadVersionException);
+
+ // delete with bad version
+ subManager.deleteSubscriptionData(topic, subid, v1, voidCallback,
null);
+ Assert.assertTrue(callback.queue.take().right() instanceof
PubSubException.BadVersionException);
+ subManager.deleteSubscriptionData(topic, subid,
res.left().get(subid).getVersion(), voidCallback, null);
Assert.assertEquals("Fail to delete an existed subscriber", null,
callback.queue.take().right());
// read subscription states again