[ 
https://issues.apache.org/jira/browse/BOOKKEEPER-310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13414954#comment-13414954
 ] 

Sijie Guo commented on BOOKKEEPER-310:
--------------------------------------

@Mridul, 

{quote}
The testcases you mention are already present under activemq's testcases under 
jms provider : I think I already specified this earlier. 
{quote}

As I commented before, this feature is a general feature added for hedwig 
client not only for jms clinet. so we had to ensure it works correctly from 
hedwig client perspective. so a robust test case would be necessary for it.

I wrote a case in TestPubSubClient (hedwig-server) module as below. Does it 
make sense to you?

{code}
+    @Test
+    public void testAsyncPublishWithResponse() throws Exception {
+        ByteString topic = 
ByteString.copyFromUtf8("testAsyncPublishWithResponse");
+        ByteString subid = ByteString.copyFromUtf8("mysubid");
+
+        final String prefix = "AsyncMessage-";
+        final int numMessages = 30;
+
+        final AtomicInteger numPublished = new AtomicInteger(0);
+        final CountDownLatch publishLatch = new CountDownLatch(1);
+        final Map<String, MessageSeqId> publishedMsgs =
+            new HashMap<String, MessageSeqId>();
+
+        final AtomicInteger numReceived = new AtomicInteger(0);
+        final CountDownLatch receiveLatch = new CountDownLatch(1);
+        final Map<String, MessageSeqId> receivedMsgs =
+            new HashMap<String, MessageSeqId>();
+
+        subscriber.subscribe(topic, subid, CreateOrAttach.CREATE_OR_ATTACH);
+        subscriber.startDelivery(topic, subid, new MessageHandler() {
+            synchronized public void deliver(ByteString topic, ByteString 
subscriberId,
+                                             Message msg, Callback<Void> 
callback,
+                                             Object context) {
+                String str = msg.getBody().toStringUtf8();
+                receivedMsgs.put(str, msg.getMsgId()); 
+                if (numMessages == numReceived.incrementAndGet()) {
+                    receiveLatch.countDown();
+                }
+                callback.operationFinished(context, null);
+            }
+        });
+
+        for (int i=0; i<numMessages; i++) {
+            final String str = prefix + i;
+            ByteString data = ByteString.copyFromUtf8(str);
+            Message msg = Message.newBuilder().setBody(data).build();
+            publisher.asyncPublishWithResponse(topic, msg, new 
Callback<PublishResponse>() {
+                @Override
+                public void operationFinished(Object ctx, PublishResponse 
response) {
+                    publishedMsgs.put(str, response.getPublishedMsgId());
+                    if (numMessages == numPublished.incrementAndGet()) {
+                        publishLatch.countDown();
+                    }
+                }
+                @Override
+                public void operationFailed(Object ctx, final PubSubException 
exception) {
+                    publishLatch.countDown();
+                }
+            }, null);
+        }
+        assertTrue("Timed out waiting on callback for publish requests.",
+                   publishLatch.await(10, TimeUnit.SECONDS));
+        assertEquals("Should be expected " + numMessages + " publishes.",
+                     numMessages, numPublished.get());
+        assertEquals("Should be expected " + numMessages + " publishe 
responses.",
+                     numMessages, publishedMsgs.size());
+
+        assertTrue("Timed out waiting on callback for messages.",
+                   receiveLatch.await(30, TimeUnit.SECONDS));
+        assertEquals("Should be expected " + numMessages + " messages.",
+                     numMessages, numReceived.get());
+        assertEquals("Should be expected " + numMessages + " messages in map.",
+                     numMessages, receivedMsgs.size());
+
+        for (int i=0; i<numMessages; i++) {
+            final String str = prefix + i;
+            MessageSeqId pubId = publishedMsgs.get(str);
+            MessageSeqId revId = receivedMsgs.get(str);
+            assertTrue("Doesn't receive same message seq id for " + str,
+                       pubId.equals(revId));
+        }
+    }
{code}
                
> Changes in hedwig server to support JMS spec
> --------------------------------------------
>
>                 Key: BOOKKEEPER-310
>                 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-310
>             Project: Bookkeeper
>          Issue Type: Sub-task
>            Reporter: Mridul Muralidharan
>         Attachments: hedwig-server.patch, hedwig-server.patch.1, 
> hedwig-server.patch.3, hedwig-server.patch.4
>
>
> The primary changes are :
> a) Support modified protocol changes (optional body).
> b) Return the published message's seq-id in the response.
> c) Minor bugfix to Array indexing in bucket which was triggered in a testcase.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: 
https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to