[
https://issues.apache.org/jira/browse/BOOKKEEPER-310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13414954#comment-13414954
]
Sijie Guo commented on BOOKKEEPER-310:
--------------------------------------
@Mridul,
{quote}
The testcases you mention are already present under activemq's testcases under
jms provider : I think I already specified this earlier.
{quote}
As I commented before, this feature is a general feature added for hedwig
client not only for jms clinet. so we had to ensure it works correctly from
hedwig client perspective. so a robust test case would be necessary for it.
I wrote a case in TestPubSubClient (hedwig-server) module as below. Does it
make sense to you?
{code}
+ @Test
+ public void testAsyncPublishWithResponse() throws Exception {
+ ByteString topic =
ByteString.copyFromUtf8("testAsyncPublishWithResponse");
+ ByteString subid = ByteString.copyFromUtf8("mysubid");
+
+ final String prefix = "AsyncMessage-";
+ final int numMessages = 30;
+
+ final AtomicInteger numPublished = new AtomicInteger(0);
+ final CountDownLatch publishLatch = new CountDownLatch(1);
+ final Map<String, MessageSeqId> publishedMsgs =
+ new HashMap<String, MessageSeqId>();
+
+ final AtomicInteger numReceived = new AtomicInteger(0);
+ final CountDownLatch receiveLatch = new CountDownLatch(1);
+ final Map<String, MessageSeqId> receivedMsgs =
+ new HashMap<String, MessageSeqId>();
+
+ subscriber.subscribe(topic, subid, CreateOrAttach.CREATE_OR_ATTACH);
+ subscriber.startDelivery(topic, subid, new MessageHandler() {
+ synchronized public void deliver(ByteString topic, ByteString
subscriberId,
+ Message msg, Callback<Void>
callback,
+ Object context) {
+ String str = msg.getBody().toStringUtf8();
+ receivedMsgs.put(str, msg.getMsgId());
+ if (numMessages == numReceived.incrementAndGet()) {
+ receiveLatch.countDown();
+ }
+ callback.operationFinished(context, null);
+ }
+ });
+
+ for (int i=0; i<numMessages; i++) {
+ final String str = prefix + i;
+ ByteString data = ByteString.copyFromUtf8(str);
+ Message msg = Message.newBuilder().setBody(data).build();
+ publisher.asyncPublishWithResponse(topic, msg, new
Callback<PublishResponse>() {
+ @Override
+ public void operationFinished(Object ctx, PublishResponse
response) {
+ publishedMsgs.put(str, response.getPublishedMsgId());
+ if (numMessages == numPublished.incrementAndGet()) {
+ publishLatch.countDown();
+ }
+ }
+ @Override
+ public void operationFailed(Object ctx, final PubSubException
exception) {
+ publishLatch.countDown();
+ }
+ }, null);
+ }
+ assertTrue("Timed out waiting on callback for publish requests.",
+ publishLatch.await(10, TimeUnit.SECONDS));
+ assertEquals("Should be expected " + numMessages + " publishes.",
+ numMessages, numPublished.get());
+ assertEquals("Should be expected " + numMessages + " publishe
responses.",
+ numMessages, publishedMsgs.size());
+
+ assertTrue("Timed out waiting on callback for messages.",
+ receiveLatch.await(30, TimeUnit.SECONDS));
+ assertEquals("Should be expected " + numMessages + " messages.",
+ numMessages, numReceived.get());
+ assertEquals("Should be expected " + numMessages + " messages in map.",
+ numMessages, receivedMsgs.size());
+
+ for (int i=0; i<numMessages; i++) {
+ final String str = prefix + i;
+ MessageSeqId pubId = publishedMsgs.get(str);
+ MessageSeqId revId = receivedMsgs.get(str);
+ assertTrue("Doesn't receive same message seq id for " + str,
+ pubId.equals(revId));
+ }
+ }
{code}
> Changes in hedwig server to support JMS spec
> --------------------------------------------
>
> Key: BOOKKEEPER-310
> URL: https://issues.apache.org/jira/browse/BOOKKEEPER-310
> Project: Bookkeeper
> Issue Type: Sub-task
> Reporter: Mridul Muralidharan
> Attachments: hedwig-server.patch, hedwig-server.patch.1,
> hedwig-server.patch.3, hedwig-server.patch.4
>
>
> The primary changes are :
> a) Support modified protocol changes (optional body).
> b) Return the published message's seq-id in the response.
> c) Minor bugfix to Array indexing in bucket which was triggered in a testcase.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators:
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira