Updated Branches:
  refs/heads/master 2e4c66675 -> 3e54454a2

Get a set of messages by id.


Project: http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/commit/3e54454a
Tree: 
http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/tree/3e54454a
Diff: 
http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/diff/3e54454a

Branch: refs/heads/master
Commit: 3e54454a2a9d2437fa5836e8209c18c414f8554e
Parents: 2e4c666
Author: Everett Toews <[email protected]>
Authored: Sun Nov 24 18:09:04 2013 -0600
Committer: Everett Toews <[email protected]>
Committed: Sun Nov 24 19:50:52 2013 -0600

----------------------------------------------------------------------
 .../marconi/v1/binders/BindIdsToQueryParam.java |  44 +++++++
 .../marconi/v1/domain/MessageStream.java        |   8 +-
 .../openstack/marconi/v1/domain/Queues.java     |   1 -
 .../marconi/v1/features/MessageApi.java         |  44 +++++--
 .../openstack/marconi/v1/features/QueueApi.java |   2 +-
 .../marconi/v1/functions/ParseMessages.java     | 103 ----------------
 .../v1/functions/ParseMessagesCreated.java      |   2 +-
 .../v1/functions/ParseMessagesToList.java       |  57 +++++++++
 .../v1/functions/ParseMessagesToStream.java     | 101 ++++++++++++++++
 .../marconi/v1/functions/ParseQueueStats.java   |   2 +-
 .../marconi/v1/options/ListQueuesOptions.java   |   2 +-
 .../v1/options/StreamMessagesOptions.java       | 119 +++++++++++++++++++
 .../marconi/v1/options/StreamOptions.java       | 119 -------------------
 .../marconi/v1/features/MessageApiLiveTest.java |  33 ++++-
 .../marconi/v1/features/MessageApiMockTest.java |  34 +++++-
 15 files changed, 428 insertions(+), 243 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/3e54454a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/binders/BindIdsToQueryParam.java
----------------------------------------------------------------------
diff --git 
a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/binders/BindIdsToQueryParam.java
 
b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/binders/BindIdsToQueryParam.java
new file mode 100644
index 0000000..48c2fae
--- /dev/null
+++ 
b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/binders/BindIdsToQueryParam.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.jclouds.openstack.marconi.v1.binders;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
+import org.jclouds.http.HttpRequest;
+import org.jclouds.rest.Binder;
+
+import javax.inject.Singleton;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * @author Everett Toews
+ */
+@Singleton
+public class BindIdsToQueryParam implements Binder {
+
+   @SuppressWarnings("unchecked")
+   @Override
+   public <R extends HttpRequest> R bindToRequest(R request, Object input) {
+      checkArgument(input instanceof Iterable<?>, "This binder is only valid 
for Iterable");
+      Iterable<String> ids = (Iterable<String>) checkNotNull(input, "Iterable 
of Strings");
+      checkArgument(Iterables.size(ids) > 0, "You must specify at least one 
id");
+
+      return (R) request.toBuilder().replaceQueryParam("ids", 
Joiner.on(',').join(ids)).build();
+   }
+}

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/3e54454a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/MessageStream.java
----------------------------------------------------------------------
diff --git 
a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/MessageStream.java
 
b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/MessageStream.java
index 84b67b5..4a77a1b 100644
--- 
a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/MessageStream.java
+++ 
b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/MessageStream.java
@@ -21,7 +21,7 @@ import com.google.common.base.Optional;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Multimap;
-import org.jclouds.openstack.marconi.v1.options.StreamOptions;
+import org.jclouds.openstack.marconi.v1.options.StreamMessagesOptions;
 import org.jclouds.openstack.v2_0.domain.Link;
 import org.jclouds.openstack.v2_0.domain.PaginatedCollection;
 
@@ -37,8 +37,8 @@ public class MessageStream extends 
PaginatedCollection<Message> {
     *
     * @return The options necessary to get the next page of messages.
     */
-   public StreamOptions nextStreamOptions() {
-      return StreamOptions.class.cast(nextMarker().get());
+   public StreamMessagesOptions nextStreamOptions() {
+      return StreamMessagesOptions.class.cast(nextMarker().get());
    }
 
    @Override
@@ -58,7 +58,7 @@ public class MessageStream extends 
PaginatedCollection<Message> {
       @Override
       public Object apply(Link link) {
          Multimap<String, String> queryParams = 
queryParser().apply(link.getHref().getRawQuery());
-         StreamOptions paginationOptions = 
StreamOptions.Builder.queryParameters(queryParams);
+         StreamMessagesOptions paginationOptions = 
StreamMessagesOptions.Builder.queryParameters(queryParams);
 
          return paginationOptions;
       }

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/3e54454a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Queues.java
----------------------------------------------------------------------
diff --git 
a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Queues.java
 
b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Queues.java
index 6ef6a93..d5f3d74 100644
--- 
a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Queues.java
+++ 
b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Queues.java
@@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Multimap;
 import org.jclouds.openstack.marconi.v1.options.ListQueuesOptions;
-import org.jclouds.openstack.marconi.v1.options.StreamOptions;
 import org.jclouds.openstack.v2_0.domain.Link;
 import org.jclouds.openstack.v2_0.domain.PaginatedCollection;
 

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/3e54454a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/MessageApi.java
----------------------------------------------------------------------
diff --git 
a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/MessageApi.java
 
b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/MessageApi.java
index ecfb2c5..9b9fcb8 100644
--- 
a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/MessageApi.java
+++ 
b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/MessageApi.java
@@ -17,14 +17,16 @@
 package org.jclouds.openstack.marconi.v1.features;
 
 import org.jclouds.Fallbacks;
-import org.jclouds.openstack.keystone.v2_0.KeystoneFallbacks;
 import org.jclouds.openstack.keystone.v2_0.filters.AuthenticateRequest;
+import org.jclouds.openstack.marconi.v1.binders.BindIdsToQueryParam;
 import org.jclouds.openstack.marconi.v1.domain.CreateMessage;
-import org.jclouds.openstack.marconi.v1.domain.MessagesCreated;
+import org.jclouds.openstack.marconi.v1.domain.Message;
 import org.jclouds.openstack.marconi.v1.domain.MessageStream;
-import org.jclouds.openstack.marconi.v1.functions.ParseMessages;
+import org.jclouds.openstack.marconi.v1.domain.MessagesCreated;
+import org.jclouds.openstack.marconi.v1.functions.ParseMessagesToStream;
 import org.jclouds.openstack.marconi.v1.functions.ParseMessagesCreated;
-import org.jclouds.openstack.marconi.v1.options.StreamOptions;
+import org.jclouds.openstack.marconi.v1.functions.ParseMessagesToList;
+import org.jclouds.openstack.marconi.v1.options.StreamMessagesOptions;
 import org.jclouds.rest.annotations.BinderParam;
 import org.jclouds.rest.annotations.Fallback;
 import org.jclouds.rest.annotations.RequestFilters;
@@ -38,10 +40,14 @@ import javax.ws.rs.GET;
 import javax.ws.rs.HeaderParam;
 import javax.ws.rs.POST;
 import javax.ws.rs.Path;
+import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
 import java.util.List;
 import java.util.UUID;
 
+import static org.jclouds.Fallbacks.EmptyListOnNotFoundOr404;
+import static 
org.jclouds.openstack.keystone.v2_0.KeystoneFallbacks.EmptyPaginatedCollectionOnNotFoundOr404;
+
 /**
  * Provides access to Messages via their REST API.
  *
@@ -69,12 +75,36 @@ public interface MessageApi {
    MessagesCreated create(@HeaderParam("Client-ID") UUID clientId,
                           @BinderParam(BindToJsonPayload.class) 
List<CreateMessage> messages);
 
+   /**
+    * Streams the messages off of a queue. In a very active queue it's 
possible that you could continuously stream
+    * messages indefinitely.
+    *
+    * @param clientId A UUID for each client instance.
+    * @param options Options for streaming messages to your client.
+    */
    @Named("message:stream")
    @GET
-   @ResponseParser(ParseMessages.class)
+   @ResponseParser(ParseMessagesToStream.class)
    @Consumes(MediaType.APPLICATION_JSON)
-   @Fallback(KeystoneFallbacks.EmptyPaginatedCollectionOnNotFoundOr404.class)
+   @Fallback(EmptyPaginatedCollectionOnNotFoundOr404.class)
    @Path("/messages")
    MessageStream stream(@HeaderParam("Client-ID") UUID clientId,
-                        StreamOptions... options);
+                        StreamMessagesOptions... options);
+
+   /**
+    * List specific messages. Unlike the stream method, a client's own 
messages are always returned in this operation.
+    *
+    * @param clientId A UUID for each client instance.
+    * @param ids Specifies the IDs of the messages to get.
+    */
+   @Named("message:list")
+   @GET
+   @ResponseParser(ParseMessagesToList.class)
+   @Consumes(MediaType.APPLICATION_JSON)
+   @Path("/messages")
+   @Fallback(EmptyListOnNotFoundOr404.class)
+   List<Message> list(@HeaderParam("Client-ID") UUID clientId,
+                      @BinderParam(BindIdsToQueryParam.class) Iterable<String> 
ids);
+
+   // TODO: list by claim id when claim API done
 }

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/3e54454a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/QueueApi.java
----------------------------------------------------------------------
diff --git 
a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/QueueApi.java
 
b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/QueueApi.java
index a2fc044..c12bbf3 100644
--- 
a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/QueueApi.java
+++ 
b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/QueueApi.java
@@ -110,7 +110,7 @@ public interface QueueApi {
    /**
     * Use this method to manually page through the list of queues.
     */
-   @Named("record:list")
+   @Named("queue:list")
    @GET
    @ResponseParser(ParseQueues.class)
    @Consumes(MediaType.APPLICATION_JSON)

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/3e54454a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessages.java
----------------------------------------------------------------------
diff --git 
a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessages.java
 
b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessages.java
deleted file mode 100644
index 163483d..0000000
--- 
a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessages.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.jclouds.openstack.marconi.v1.functions;
-
-import com.google.common.base.Function;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-import org.jclouds.http.HttpResponse;
-import org.jclouds.http.functions.ParseJson;
-import org.jclouds.openstack.marconi.v1.domain.Message;
-import org.jclouds.openstack.marconi.v1.domain.MessageStream;
-import org.jclouds.openstack.v2_0.domain.Link;
-import org.jclouds.openstack.v2_0.domain.PaginatedCollection;
-
-import javax.inject.Inject;
-import java.beans.ConstructorProperties;
-import java.util.List;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * @author Everett Toews
- */
-public class ParseMessages implements Function<HttpResponse, MessageStream> {
-
-   private final ParseJson<MessagesWithHref> json;
-
-   @Inject
-   ParseMessages(ParseJson<MessagesWithHref> json) {
-      this.json = checkNotNull(json, "json");
-   }
-
-   @Override
-   public MessageStream apply(HttpResponse response) {
-      // An empty message stream has a 204 response code
-      if (response.getStatusCode() == 204) {
-         return new Messages(ImmutableSet.<Message> of(), ImmutableSet.<Link> 
of());
-      }
-
-      MessagesWithHref messagesWithHref = json.apply(response);
-      Iterable<Message> messages = Iterables.transform(messagesWithHref, 
TO_MESSAGE);
-
-      return new Messages(messages, messagesWithHref.getLinks());
-   }
-
-   private static String getMessageId(String rawMessageHref) {
-      // strip off everything but the message id
-      return rawMessageHref.substring(rawMessageHref.lastIndexOf('/')+1);
-   }
-
-   protected static final Function<MessageWithHref, Message> TO_MESSAGE = new 
Function<MessageWithHref, Message>() {
-      @Override
-      public Message apply(MessageWithHref messageWithHref) {
-         return 
messageWithHref.toBuilder().id(getMessageId(messageWithHref.getId())).build();
-      }
-   };
-
-   protected static final Function<String, String> TO_MESSAGE_ID = new 
Function<String, String>() {
-      @Override
-      public String apply(String messageIdWithHref) {
-         return getMessageId(messageIdWithHref);
-      }
-   };
-
-   private static class Messages extends MessageStream {
-
-      @ConstructorProperties({ "messages", "links" })
-      protected Messages(Iterable<Message> messages, Iterable<Link> links) {
-         super(messages, links);
-      }
-   }
-
-   private static class MessagesWithHref extends 
PaginatedCollection<MessageWithHref> {
-
-      @ConstructorProperties({ "messages", "links" })
-      protected MessagesWithHref(Iterable<MessageWithHref> messagesWithHref, 
Iterable<Link> links) {
-         super(messagesWithHref, links);
-      }
-   }
-
-   private static class MessageWithHref extends Message {
-
-      @ConstructorProperties({ "href", "ttl", "body", "age" })
-      protected MessageWithHref(String href, int ttl, String body, int age) {
-         super(href, ttl, body, age);
-      }
-   }
-}

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/3e54454a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesCreated.java
----------------------------------------------------------------------
diff --git 
a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesCreated.java
 
b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesCreated.java
index d2d4d83..9be3722 100644
--- 
a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesCreated.java
+++ 
b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesCreated.java
@@ -26,7 +26,7 @@ import 
org.jclouds.openstack.marconi.v1.domain.MessagesCreated;
 import java.util.List;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static 
org.jclouds.openstack.marconi.v1.functions.ParseMessages.TO_MESSAGE_ID;
+import static 
org.jclouds.openstack.marconi.v1.functions.ParseMessagesToStream.TO_MESSAGE_ID;
 
 /**
  * This parses the messages created on a queue.

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/3e54454a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesToList.java
----------------------------------------------------------------------
diff --git 
a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesToList.java
 
b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesToList.java
new file mode 100644
index 0000000..d7d0b77
--- /dev/null
+++ 
b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesToList.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.jclouds.openstack.marconi.v1.functions;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.jclouds.http.HttpResponse;
+import org.jclouds.http.functions.ParseJson;
+import org.jclouds.openstack.marconi.v1.domain.Message;
+
+import javax.inject.Inject;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.Iterables.transform;
+import static com.google.common.collect.Lists.newArrayList;
+import static 
org.jclouds.openstack.marconi.v1.functions.ParseMessagesToStream.MessageWithHref;
+import static 
org.jclouds.openstack.marconi.v1.functions.ParseMessagesToStream.TO_MESSAGE;
+
+/**
+ * @author Everett Toews
+ */
+public class ParseMessagesToList implements Function<HttpResponse, 
List<Message>> {
+
+   private final ParseJson<List<MessageWithHref>> json;
+
+   @Inject
+   ParseMessagesToList(ParseJson<List<MessageWithHref>> json) {
+      this.json = checkNotNull(json, "json");
+   }
+
+   @Override
+   public List<Message> apply(HttpResponse response) {
+      // An empty message stream has a 204 response code
+      if (response.getStatusCode() == 204) {
+         return ImmutableList.of();
+      }
+
+      List<MessageWithHref> messagesWithHref = json.apply(response);
+      return Lists.newArrayList(transform(messagesWithHref, TO_MESSAGE));
+   }
+}

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/3e54454a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesToStream.java
----------------------------------------------------------------------
diff --git 
a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesToStream.java
 
b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesToStream.java
new file mode 100644
index 0000000..76a9ba7
--- /dev/null
+++ 
b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesToStream.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.jclouds.openstack.marconi.v1.functions;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import org.jclouds.http.HttpResponse;
+import org.jclouds.http.functions.ParseJson;
+import org.jclouds.openstack.marconi.v1.domain.Message;
+import org.jclouds.openstack.marconi.v1.domain.MessageStream;
+import org.jclouds.openstack.v2_0.domain.Link;
+import org.jclouds.openstack.v2_0.domain.PaginatedCollection;
+
+import javax.inject.Inject;
+import java.beans.ConstructorProperties;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * @author Everett Toews
+ */
+public class ParseMessagesToStream implements Function<HttpResponse, 
MessageStream> {
+
+   private final ParseJson<MessagesWithHref> json;
+
+   @Inject
+   ParseMessagesToStream(ParseJson<MessagesWithHref> json) {
+      this.json = checkNotNull(json, "json");
+   }
+
+   @Override
+   public MessageStream apply(HttpResponse response) {
+      // An empty message stream has a 204 response code
+      if (response.getStatusCode() == 204) {
+         return new Messages(ImmutableSet.<Message> of(), ImmutableSet.<Link> 
of());
+      }
+
+      MessagesWithHref messagesWithHref = json.apply(response);
+      Iterable<Message> messages = Iterables.transform(messagesWithHref, 
TO_MESSAGE);
+
+      return new Messages(messages, messagesWithHref.getLinks());
+   }
+
+   private static String getMessageId(String rawMessageHref) {
+      // strip off everything but the message id
+      return rawMessageHref.substring(rawMessageHref.lastIndexOf('/')+1);
+   }
+
+   protected static final Function<MessageWithHref, Message> TO_MESSAGE = new 
Function<MessageWithHref, Message>() {
+      @Override
+      public Message apply(MessageWithHref messageWithHref) {
+         return 
messageWithHref.toBuilder().id(getMessageId(messageWithHref.getId())).build();
+      }
+   };
+
+   protected static final Function<String, String> TO_MESSAGE_ID = new 
Function<String, String>() {
+      @Override
+      public String apply(String messageIdWithHref) {
+         return getMessageId(messageIdWithHref);
+      }
+   };
+
+   private static class Messages extends MessageStream {
+
+      @ConstructorProperties({ "messages", "links" })
+      protected Messages(Iterable<Message> messages, Iterable<Link> links) {
+         super(messages, links);
+      }
+   }
+
+   private static class MessagesWithHref extends 
PaginatedCollection<MessageWithHref> {
+
+      @ConstructorProperties({ "messages", "links" })
+      protected MessagesWithHref(Iterable<MessageWithHref> messagesWithHref, 
Iterable<Link> links) {
+         super(messagesWithHref, links);
+      }
+   }
+
+   protected static class MessageWithHref extends Message {
+
+      @ConstructorProperties({ "href", "ttl", "body", "age" })
+      protected MessageWithHref(String href, int ttl, String body, int age) {
+         super(href, ttl, body, age);
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/3e54454a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseQueueStats.java
----------------------------------------------------------------------
diff --git 
a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseQueueStats.java
 
b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseQueueStats.java
index fb87c6c..e6f0ee2 100644
--- 
a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseQueueStats.java
+++ 
b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseQueueStats.java
@@ -25,7 +25,7 @@ import org.jclouds.openstack.marconi.v1.domain.MessagesStats;
 import org.jclouds.openstack.marconi.v1.domain.QueueStats;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static 
org.jclouds.openstack.marconi.v1.functions.ParseMessages.TO_MESSAGE_ID;
+import static 
org.jclouds.openstack.marconi.v1.functions.ParseMessagesToStream.TO_MESSAGE_ID;
 
 /**
  * This parses the stats of a queue.

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/3e54454a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/ListQueuesOptions.java
----------------------------------------------------------------------
diff --git 
a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/ListQueuesOptions.java
 
b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/ListQueuesOptions.java
index 1ce60d4..929529d 100644
--- 
a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/ListQueuesOptions.java
+++ 
b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/ListQueuesOptions.java
@@ -67,7 +67,7 @@ public class ListQueuesOptions extends PaginationOptions {
    }
 
    /**
-    * @return The String representation of the marker for these StreamOptions.
+    * @return The String representation of the marker for these 
StreamMessagesOptions.
     */
    public String getMarker() {
       return Iterables.getOnlyElement(queryParameters.get("marker"));

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/3e54454a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/StreamMessagesOptions.java
----------------------------------------------------------------------
diff --git 
a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/StreamMessagesOptions.java
 
b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/StreamMessagesOptions.java
new file mode 100644
index 0000000..b0ff396
--- /dev/null
+++ 
b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/StreamMessagesOptions.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.jclouds.openstack.marconi.v1.options;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Multimap;
+import org.jclouds.openstack.v2_0.options.PaginationOptions;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Options used to control the messages returned in the response.
+ */
+public class StreamMessagesOptions extends PaginationOptions {
+
+   public static final StreamMessagesOptions NONE = new 
StreamMessagesOptions();
+
+   /**
+    * {@inheritDoc}
+    */
+   @Override
+   public StreamMessagesOptions queryParameters(Multimap<String, String> 
queryParams) {
+      checkNotNull(queryParams, "queryParams");
+      queryParameters.putAll(queryParams);
+      return this;
+   }
+
+   /**
+    * @see Builder#marker(String)
+    */
+   @Override
+   public StreamMessagesOptions marker(String marker) {
+      super.marker(marker);
+      return this;
+   }
+
+   /**
+    * @see Builder#limit(int)
+    */
+   @Override
+   public StreamMessagesOptions limit(int limit) {
+      super.limit(limit);
+      return this;
+
+   }
+
+   /**
+    * @see Builder#echo(boolean)
+    */
+   public StreamMessagesOptions echo(boolean echo) {
+      queryParameters.put("echo", Boolean.toString(echo));
+      return this;
+   }
+
+   /**
+    * @return The String representation of the marker for these 
StreamMessagesOptions.
+    */
+   public String getMarker() {
+      return Iterables.getOnlyElement(queryParameters.get("marker"));
+   }
+
+   public static class Builder {
+      /**
+       * @see PaginationOptions#queryParameters(Multimap)
+       */
+      public static StreamMessagesOptions queryParameters(Multimap<String, 
String> queryParams) {
+         StreamMessagesOptions options = new StreamMessagesOptions();
+         return options.queryParameters(queryParams);
+      }
+
+      /**
+       * Specifies an opaque string that the client can use to request the 
next batch of messages. The marker parameter
+       * communicates to the server which messages the client has already 
received. If you do not specify a value, the
+       * API returns all messages at the head of the queue (up to the limit).
+       * </p>
+       * Clients should make no assumptions about the format or length of the 
marker. Furthermore, clients should assume
+       * that there is no relationship between markers and message IDs.
+       */
+      public static StreamMessagesOptions marker(String marker) {
+         StreamMessagesOptions options = new StreamMessagesOptions();
+         return options.marker(marker);
+      }
+
+      /**
+       * When more messages are available than can be returned in a single 
request, the client can pick up the next
+       * batch of messages by simply using the {@see StremOptions} returned 
from the previous call in {@code
+       * MessageStream#nextStreamOptions()}. Specifies up to 10 messages (the 
default value) to return. If you do not
+       * specify a value for the limit parameter, the default value of 10 is 
used.
+       */
+      public static StreamMessagesOptions limit(int limit) {
+         StreamMessagesOptions options = new StreamMessagesOptions();
+         return options.limit(limit);
+      }
+
+      /**
+       * The echo parameter determines whether the API returns a client's own 
messages, as determined by the clientId
+       * (UUID) portion of the client. If you do not specify a value, echo 
uses the default value of false. If you are
+       * experimenting with the API, you might want to set echo=true in order 
to see the messages that you posted.
+       */
+      public static StreamMessagesOptions echo(boolean echo) {
+         StreamMessagesOptions options = new StreamMessagesOptions();
+         return options.echo(echo);
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/3e54454a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/StreamOptions.java
----------------------------------------------------------------------
diff --git 
a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/StreamOptions.java
 
b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/StreamOptions.java
deleted file mode 100644
index be2fffb..0000000
--- 
a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/StreamOptions.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.jclouds.openstack.marconi.v1.options;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Multimap;
-import org.jclouds.openstack.v2_0.options.PaginationOptions;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Options used to control the messages returned in the response.
- */
-public class StreamOptions extends PaginationOptions {
-
-   public static final StreamOptions NONE = new StreamOptions();
-
-   /**
-    * {@inheritDoc}
-    */
-   @Override
-   public StreamOptions queryParameters(Multimap<String, String> queryParams) {
-      checkNotNull(queryParams, "queryParams");
-      queryParameters.putAll(queryParams);
-      return this;
-   }
-
-   /**
-    * @see Builder#marker(String)
-    */
-   @Override
-   public StreamOptions marker(String marker) {
-      super.marker(marker);
-      return this;
-   }
-
-   /**
-    * @see Builder#limit(int)
-    */
-   @Override
-   public StreamOptions limit(int limit) {
-      super.limit(limit);
-      return this;
-
-   }
-
-   /**
-    * @see Builder#echo(boolean)
-    */
-   public StreamOptions echo(boolean echo) {
-      queryParameters.put("echo", Boolean.toString(echo));
-      return this;
-   }
-
-   /**
-    * @return The String representation of the marker for these StreamOptions.
-    */
-   public String getMarker() {
-      return Iterables.getOnlyElement(queryParameters.get("marker"));
-   }
-
-   public static class Builder {
-      /**
-       * @see PaginationOptions#queryParameters(Multimap)
-       */
-      public static StreamOptions queryParameters(Multimap<String, String> 
queryParams) {
-         StreamOptions options = new StreamOptions();
-         return options.queryParameters(queryParams);
-      }
-
-      /**
-       * Specifies an opaque string that the client can use to request the 
next batch of messages. The marker parameter
-       * communicates to the server which messages the client has already 
received. If you do not specify a value, the
-       * API returns all messages at the head of the queue (up to the limit).
-       * </p>
-       * Clients should make no assumptions about the format or length of the 
marker. Furthermore, clients should assume
-       * that there is no relationship between markers and message IDs.
-       */
-      public static StreamOptions marker(String marker) {
-         StreamOptions options = new StreamOptions();
-         return options.marker(marker);
-      }
-
-      /**
-       * When more messages are available than can be returned in a single 
request, the client can pick up the next
-       * batch of messages by simply using the {@see StremOptions} returned 
from the previous call in {@code
-       * MessageStream#nextStreamOptions()}. Specifies up to 10 messages (the 
default value) to return. If you do not
-       * specify a value for the limit parameter, the default value of 10 is 
used.
-       */
-      public static StreamOptions limit(int limit) {
-         StreamOptions options = new StreamOptions();
-         return options.limit(limit);
-      }
-
-      /**
-       * The echo parameter determines whether the API returns a client's own 
messages, as determined by the clientId
-       * (UUID) portion of the client. If you do not specify a value, echo 
uses the default value of false. If you are
-       * experimenting with the API, you might want to set echo=true in order 
to see the messages that you posted.
-       */
-      public static StreamOptions echo(boolean echo) {
-         StreamOptions options = new StreamOptions();
-         return options.echo(echo);
-      }
-   }
-}

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/3e54454a/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiLiveTest.java
----------------------------------------------------------------------
diff --git 
a/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiLiveTest.java
 
b/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiLiveTest.java
index 3bfe3e0..279ba20 100644
--- 
a/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiLiveTest.java
+++ 
b/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiLiveTest.java
@@ -18,16 +18,20 @@ package org.jclouds.openstack.marconi.v1.features;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
 import org.jclouds.openstack.marconi.v1.domain.CreateMessage;
+import org.jclouds.openstack.marconi.v1.domain.Message;
 import org.jclouds.openstack.marconi.v1.domain.MessageStream;
 import org.jclouds.openstack.marconi.v1.domain.MessagesCreated;
 import org.jclouds.openstack.marconi.v1.internal.BaseMarconiApiLiveTest;
 import org.testng.annotations.Test;
 
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 
-import static 
org.jclouds.openstack.marconi.v1.options.StreamOptions.Builder.echo;
+import static 
org.jclouds.openstack.marconi.v1.options.StreamMessagesOptions.Builder.echo;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
@@ -36,6 +40,8 @@ import static org.testng.Assert.assertTrue;
 @Test(groups = "live", testName = "MessageApiLiveTest", singleThreaded = true)
 public class MessageApiLiveTest extends BaseMarconiApiLiveTest {
 
+   private final Map<String, List<String>> messageIds = Maps.newHashMap();
+
    public void createQueues() throws Exception {
       for (String zoneId : api.getConfiguredZones()) {
          QueueApi queueApi = api.getQueueApiForZone(zoneId);
@@ -49,7 +55,6 @@ public class MessageApiLiveTest extends 
BaseMarconiApiLiveTest {
    public void streamZeroPagesOfMessages() throws Exception {
       for (String zoneId : api.getConfiguredZones()) {
          MessageApi messageApi = api.getMessageApiForZoneAndQueue(zoneId, 
"jclouds-test");
-
          UUID clientId = 
UUID.fromString("3381af92-2b9e-11e3-b191-71861300734c");
 
          MessageStream messageStream = messageApi.stream(clientId, echo(true));
@@ -80,7 +85,6 @@ public class MessageApiLiveTest extends 
BaseMarconiApiLiveTest {
    public void streamOnePageOfMessages() throws Exception {
       for (String zoneId : api.getConfiguredZones()) {
          MessageApi messageApi = api.getMessageApiForZoneAndQueue(zoneId, 
"jclouds-test");
-
          UUID clientId = 
UUID.fromString("3381af92-2b9e-11e3-b191-71861300734c");
 
          MessageStream messageStream = messageApi.stream(clientId, echo(true));
@@ -120,14 +124,18 @@ public class MessageApiLiveTest extends 
BaseMarconiApiLiveTest {
    public void streamManyPagesOfMessages() throws Exception {
       for (String zoneId : api.getConfiguredZones()) {
          MessageApi messageApi = api.getMessageApiForZoneAndQueue(zoneId, 
"jclouds-test");
-
          UUID clientId = 
UUID.fromString("3381af92-2b9e-11e3-b191-71861300734c");
+         messageIds.put(zoneId, new ArrayList<String>());
 
          MessageStream messageStream = messageApi.stream(clientId, 
echo(true).limit(2));
 
          while(messageStream.nextMarker().isPresent()) {
             assertEquals(Iterables.size(messageStream), 2);
 
+            for (Message message: messageStream) {
+               messageIds.get(zoneId).add(message.getId());
+            }
+
             messageStream = messageApi.stream(clientId, 
messageStream.nextStreamOptions());
          }
 
@@ -136,6 +144,23 @@ public class MessageApiLiveTest extends 
BaseMarconiApiLiveTest {
    }
 
    @Test(dependsOnMethods = { "streamManyPagesOfMessages" })
+   public void listMessagesByIds() throws Exception {
+      for (String zoneId : api.getConfiguredZones()) {
+         MessageApi messageApi = api.getMessageApiForZoneAndQueue(zoneId, 
"jclouds-test");
+         UUID clientId = 
UUID.fromString("3381af92-2b9e-11e3-b191-71861300734c");
+
+         List<Message> messages = messageApi.list(clientId, 
messageIds.get(zoneId));
+
+         assertEquals(messages.size(), 4);
+
+         for (Message message: messages) {
+            assertNotNull(message.getId());
+            assertNotNull(message.getBody());
+         }
+      }
+   }
+
+   @Test(dependsOnMethods = { "listMessagesByIds" })
    public void delete() throws Exception {
       for (String zoneId : api.getConfiguredZones()) {
          QueueApi queueApi = api.getQueueApiForZone(zoneId);

http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/3e54454a/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiMockTest.java
----------------------------------------------------------------------
diff --git 
a/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiMockTest.java
 
b/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiMockTest.java
index 8f45aa3..8adb244 100644
--- 
a/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiMockTest.java
+++ 
b/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiMockTest.java
@@ -22,6 +22,7 @@ import com.squareup.okhttp.mockwebserver.MockResponse;
 import com.squareup.okhttp.mockwebserver.MockWebServer;
 import org.jclouds.openstack.marconi.v1.MarconiApi;
 import org.jclouds.openstack.marconi.v1.domain.CreateMessage;
+import org.jclouds.openstack.marconi.v1.domain.Message;
 import org.jclouds.openstack.marconi.v1.domain.MessageStream;
 import org.jclouds.openstack.marconi.v1.domain.MessagesCreated;
 import org.jclouds.openstack.v2_0.internal.BaseOpenStackMockTest;
@@ -30,7 +31,7 @@ import org.testng.annotations.Test;
 import java.util.List;
 import java.util.UUID;
 
-import static 
org.jclouds.openstack.marconi.v1.options.StreamOptions.Builder.limit;
+import static 
org.jclouds.openstack.marconi.v1.options.StreamMessagesOptions.Builder.limit;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
@@ -195,4 +196,35 @@ public class MessageApiMockTest extends 
BaseOpenStackMockTest<MarconiApi> {
          server.shutdown();
       }
    }
+
+   public void listMessagesByIds() throws Exception {
+      MockWebServer server = mockOpenStackServer();
+      server.enqueue(new MockResponse().setBody(accessRackspace));
+      server.enqueue(new 
MockResponse().setResponseCode(200).setBody("[{\"body\": 
\"{\\\"event\\\":{\\\"name\\\":\\\"SF Java User 
Group\\\",\\\"attendees\\\":[\\\"bob\\\",\\\"jim\\\",\\\"sally\\\"]}}\", 
\"age\": 1596, \"href\": 
\"/v1/queues/jclouds-test/messages/messages/52928896b04a584f24883227\", 
\"ttl\": 86400}, {\"body\": \"{\\\"event\\\":{\\\"name\\\":\\\"Austin Java User 
Group\\\",\\\"attendees\\\":[\\\"bob\\\",\\\"jim\\\",\\\"sally\\\"]}}\", 
\"age\": 1596, \"href\": 
\"/v1/queues/jclouds-test/messages/messages/52928896b04a584f24883228\", 
\"ttl\": 86400}, {\"body\": \"{\\\"event\\\":{\\\"name\\\":\\\"HK Java User 
Group\\\",\\\"attendees\\\":[\\\"bob\\\",\\\"jim\\\",\\\"sally\\\"]}}\", 
\"age\": 1596, \"href\": 
\"/v1/queues/jclouds-test/messages/messages/52928896b04a584f24883229\", 
\"ttl\": 86400}]"));
+
+      try {
+         MarconiApi api = api(server.getUrl("/").toString(), 
"openstack-marconi");
+         MessageApi messageApi = api.getMessageApiForZoneAndQueue("DFW", 
"jclouds-test");
+         UUID clientId = 
UUID.fromString("3381af92-2b9e-11e3-b191-71861300734c");
+         List<String> ids = ImmutableList.of("52928896b04a584f24883227", 
"52928896b04a584f24883228", "52928896b04a584f24883229");
+
+         List<Message> messages = messageApi.list(clientId, ids);
+
+         assertEquals(messages.size(), 3);
+
+         for (Message message: messages) {
+            assertNotNull(message.getId());
+            assertNotNull(message.getBody());
+            assertEquals(message.getAge(), 1596);
+            assertEquals(message.getTTL(), 86400);
+         }
+
+         assertEquals(server.getRequestCount(), 2);
+         assertEquals(server.takeRequest().getRequestLine(), "POST /tokens 
HTTP/1.1");
+         assertEquals(server.takeRequest().getRequestLine(), "GET 
/v1/123123/queues/jclouds-test/messages?ids=52928896b04a584f24883227,52928896b04a584f24883228,52928896b04a584f24883229
 HTTP/1.1");
+      }
+      finally {
+         server.shutdown();
+      }
+   }
 }

Reply via email to