Updated Branches: refs/heads/master 2e4c66675 -> 3e54454a2
Get a set of messages by id. Project: http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/repo Commit: http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/commit/3e54454a Tree: http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/tree/3e54454a Diff: http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/diff/3e54454a Branch: refs/heads/master Commit: 3e54454a2a9d2437fa5836e8209c18c414f8554e Parents: 2e4c666 Author: Everett Toews <[email protected]> Authored: Sun Nov 24 18:09:04 2013 -0600 Committer: Everett Toews <[email protected]> Committed: Sun Nov 24 19:50:52 2013 -0600 ---------------------------------------------------------------------- .../marconi/v1/binders/BindIdsToQueryParam.java | 44 +++++++ .../marconi/v1/domain/MessageStream.java | 8 +- .../openstack/marconi/v1/domain/Queues.java | 1 - .../marconi/v1/features/MessageApi.java | 44 +++++-- .../openstack/marconi/v1/features/QueueApi.java | 2 +- .../marconi/v1/functions/ParseMessages.java | 103 ---------------- .../v1/functions/ParseMessagesCreated.java | 2 +- .../v1/functions/ParseMessagesToList.java | 57 +++++++++ .../v1/functions/ParseMessagesToStream.java | 101 ++++++++++++++++ .../marconi/v1/functions/ParseQueueStats.java | 2 +- .../marconi/v1/options/ListQueuesOptions.java | 2 +- .../v1/options/StreamMessagesOptions.java | 119 +++++++++++++++++++ .../marconi/v1/options/StreamOptions.java | 119 ------------------- .../marconi/v1/features/MessageApiLiveTest.java | 33 ++++- .../marconi/v1/features/MessageApiMockTest.java | 34 +++++- 15 files changed, 428 insertions(+), 243 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/3e54454a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/binders/BindIdsToQueryParam.java ---------------------------------------------------------------------- diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/binders/BindIdsToQueryParam.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/binders/BindIdsToQueryParam.java new file mode 100644 index 0000000..48c2fae --- /dev/null +++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/binders/BindIdsToQueryParam.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jclouds.openstack.marconi.v1.binders; + +import com.google.common.base.Joiner; +import com.google.common.collect.Iterables; +import org.jclouds.http.HttpRequest; +import org.jclouds.rest.Binder; + +import javax.inject.Singleton; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * @author Everett Toews + */ +@Singleton +public class BindIdsToQueryParam implements Binder { + + @SuppressWarnings("unchecked") + @Override + public <R extends HttpRequest> R bindToRequest(R request, Object input) { + checkArgument(input instanceof Iterable<?>, "This binder is only valid for Iterable"); + Iterable<String> ids = (Iterable<String>) checkNotNull(input, "Iterable of Strings"); + checkArgument(Iterables.size(ids) > 0, "You must specify at least one id"); + + return (R) request.toBuilder().replaceQueryParam("ids", Joiner.on(',').join(ids)).build(); + } +} http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/3e54454a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/MessageStream.java ---------------------------------------------------------------------- diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/MessageStream.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/MessageStream.java index 84b67b5..4a77a1b 100644 --- a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/MessageStream.java +++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/MessageStream.java @@ -21,7 +21,7 @@ import com.google.common.base.Optional; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import com.google.common.collect.Multimap; -import org.jclouds.openstack.marconi.v1.options.StreamOptions; +import org.jclouds.openstack.marconi.v1.options.StreamMessagesOptions; import org.jclouds.openstack.v2_0.domain.Link; import org.jclouds.openstack.v2_0.domain.PaginatedCollection; @@ -37,8 +37,8 @@ public class MessageStream extends PaginatedCollection<Message> { * * @return The options necessary to get the next page of messages. */ - public StreamOptions nextStreamOptions() { - return StreamOptions.class.cast(nextMarker().get()); + public StreamMessagesOptions nextStreamOptions() { + return StreamMessagesOptions.class.cast(nextMarker().get()); } @Override @@ -58,7 +58,7 @@ public class MessageStream extends PaginatedCollection<Message> { @Override public Object apply(Link link) { Multimap<String, String> queryParams = queryParser().apply(link.getHref().getRawQuery()); - StreamOptions paginationOptions = StreamOptions.Builder.queryParameters(queryParams); + StreamMessagesOptions paginationOptions = StreamMessagesOptions.Builder.queryParameters(queryParams); return paginationOptions; } http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/3e54454a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Queues.java ---------------------------------------------------------------------- diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Queues.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Queues.java index 6ef6a93..d5f3d74 100644 --- a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Queues.java +++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/domain/Queues.java @@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Multimap; import org.jclouds.openstack.marconi.v1.options.ListQueuesOptions; -import org.jclouds.openstack.marconi.v1.options.StreamOptions; import org.jclouds.openstack.v2_0.domain.Link; import org.jclouds.openstack.v2_0.domain.PaginatedCollection; http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/3e54454a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/MessageApi.java ---------------------------------------------------------------------- diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/MessageApi.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/MessageApi.java index ecfb2c5..9b9fcb8 100644 --- a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/MessageApi.java +++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/MessageApi.java @@ -17,14 +17,16 @@ package org.jclouds.openstack.marconi.v1.features; import org.jclouds.Fallbacks; -import org.jclouds.openstack.keystone.v2_0.KeystoneFallbacks; import org.jclouds.openstack.keystone.v2_0.filters.AuthenticateRequest; +import org.jclouds.openstack.marconi.v1.binders.BindIdsToQueryParam; import org.jclouds.openstack.marconi.v1.domain.CreateMessage; -import org.jclouds.openstack.marconi.v1.domain.MessagesCreated; +import org.jclouds.openstack.marconi.v1.domain.Message; import org.jclouds.openstack.marconi.v1.domain.MessageStream; -import org.jclouds.openstack.marconi.v1.functions.ParseMessages; +import org.jclouds.openstack.marconi.v1.domain.MessagesCreated; +import org.jclouds.openstack.marconi.v1.functions.ParseMessagesToStream; import org.jclouds.openstack.marconi.v1.functions.ParseMessagesCreated; -import org.jclouds.openstack.marconi.v1.options.StreamOptions; +import org.jclouds.openstack.marconi.v1.functions.ParseMessagesToList; +import org.jclouds.openstack.marconi.v1.options.StreamMessagesOptions; import org.jclouds.rest.annotations.BinderParam; import org.jclouds.rest.annotations.Fallback; import org.jclouds.rest.annotations.RequestFilters; @@ -38,10 +40,14 @@ import javax.ws.rs.GET; import javax.ws.rs.HeaderParam; import javax.ws.rs.POST; import javax.ws.rs.Path; +import javax.ws.rs.QueryParam; import javax.ws.rs.core.MediaType; import java.util.List; import java.util.UUID; +import static org.jclouds.Fallbacks.EmptyListOnNotFoundOr404; +import static org.jclouds.openstack.keystone.v2_0.KeystoneFallbacks.EmptyPaginatedCollectionOnNotFoundOr404; + /** * Provides access to Messages via their REST API. * @@ -69,12 +75,36 @@ public interface MessageApi { MessagesCreated create(@HeaderParam("Client-ID") UUID clientId, @BinderParam(BindToJsonPayload.class) List<CreateMessage> messages); + /** + * Streams the messages off of a queue. In a very active queue it's possible that you could continuously stream + * messages indefinitely. + * + * @param clientId A UUID for each client instance. + * @param options Options for streaming messages to your client. + */ @Named("message:stream") @GET - @ResponseParser(ParseMessages.class) + @ResponseParser(ParseMessagesToStream.class) @Consumes(MediaType.APPLICATION_JSON) - @Fallback(KeystoneFallbacks.EmptyPaginatedCollectionOnNotFoundOr404.class) + @Fallback(EmptyPaginatedCollectionOnNotFoundOr404.class) @Path("/messages") MessageStream stream(@HeaderParam("Client-ID") UUID clientId, - StreamOptions... options); + StreamMessagesOptions... options); + + /** + * List specific messages. Unlike the stream method, a client's own messages are always returned in this operation. + * + * @param clientId A UUID for each client instance. + * @param ids Specifies the IDs of the messages to get. + */ + @Named("message:list") + @GET + @ResponseParser(ParseMessagesToList.class) + @Consumes(MediaType.APPLICATION_JSON) + @Path("/messages") + @Fallback(EmptyListOnNotFoundOr404.class) + List<Message> list(@HeaderParam("Client-ID") UUID clientId, + @BinderParam(BindIdsToQueryParam.class) Iterable<String> ids); + + // TODO: list by claim id when claim API done } http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/3e54454a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/QueueApi.java ---------------------------------------------------------------------- diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/QueueApi.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/QueueApi.java index a2fc044..c12bbf3 100644 --- a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/QueueApi.java +++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/features/QueueApi.java @@ -110,7 +110,7 @@ public interface QueueApi { /** * Use this method to manually page through the list of queues. */ - @Named("record:list") + @Named("queue:list") @GET @ResponseParser(ParseQueues.class) @Consumes(MediaType.APPLICATION_JSON) http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/3e54454a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessages.java ---------------------------------------------------------------------- diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessages.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessages.java deleted file mode 100644 index 163483d..0000000 --- a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessages.java +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.jclouds.openstack.marconi.v1.functions; - -import com.google.common.base.Function; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import org.jclouds.http.HttpResponse; -import org.jclouds.http.functions.ParseJson; -import org.jclouds.openstack.marconi.v1.domain.Message; -import org.jclouds.openstack.marconi.v1.domain.MessageStream; -import org.jclouds.openstack.v2_0.domain.Link; -import org.jclouds.openstack.v2_0.domain.PaginatedCollection; - -import javax.inject.Inject; -import java.beans.ConstructorProperties; -import java.util.List; - -import static com.google.common.base.Preconditions.checkNotNull; - -/** - * @author Everett Toews - */ -public class ParseMessages implements Function<HttpResponse, MessageStream> { - - private final ParseJson<MessagesWithHref> json; - - @Inject - ParseMessages(ParseJson<MessagesWithHref> json) { - this.json = checkNotNull(json, "json"); - } - - @Override - public MessageStream apply(HttpResponse response) { - // An empty message stream has a 204 response code - if (response.getStatusCode() == 204) { - return new Messages(ImmutableSet.<Message> of(), ImmutableSet.<Link> of()); - } - - MessagesWithHref messagesWithHref = json.apply(response); - Iterable<Message> messages = Iterables.transform(messagesWithHref, TO_MESSAGE); - - return new Messages(messages, messagesWithHref.getLinks()); - } - - private static String getMessageId(String rawMessageHref) { - // strip off everything but the message id - return rawMessageHref.substring(rawMessageHref.lastIndexOf('/')+1); - } - - protected static final Function<MessageWithHref, Message> TO_MESSAGE = new Function<MessageWithHref, Message>() { - @Override - public Message apply(MessageWithHref messageWithHref) { - return messageWithHref.toBuilder().id(getMessageId(messageWithHref.getId())).build(); - } - }; - - protected static final Function<String, String> TO_MESSAGE_ID = new Function<String, String>() { - @Override - public String apply(String messageIdWithHref) { - return getMessageId(messageIdWithHref); - } - }; - - private static class Messages extends MessageStream { - - @ConstructorProperties({ "messages", "links" }) - protected Messages(Iterable<Message> messages, Iterable<Link> links) { - super(messages, links); - } - } - - private static class MessagesWithHref extends PaginatedCollection<MessageWithHref> { - - @ConstructorProperties({ "messages", "links" }) - protected MessagesWithHref(Iterable<MessageWithHref> messagesWithHref, Iterable<Link> links) { - super(messagesWithHref, links); - } - } - - private static class MessageWithHref extends Message { - - @ConstructorProperties({ "href", "ttl", "body", "age" }) - protected MessageWithHref(String href, int ttl, String body, int age) { - super(href, ttl, body, age); - } - } -} http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/3e54454a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesCreated.java ---------------------------------------------------------------------- diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesCreated.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesCreated.java index d2d4d83..9be3722 100644 --- a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesCreated.java +++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesCreated.java @@ -26,7 +26,7 @@ import org.jclouds.openstack.marconi.v1.domain.MessagesCreated; import java.util.List; import static com.google.common.base.Preconditions.checkNotNull; -import static org.jclouds.openstack.marconi.v1.functions.ParseMessages.TO_MESSAGE_ID; +import static org.jclouds.openstack.marconi.v1.functions.ParseMessagesToStream.TO_MESSAGE_ID; /** * This parses the messages created on a queue. http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/3e54454a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesToList.java ---------------------------------------------------------------------- diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesToList.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesToList.java new file mode 100644 index 0000000..d7d0b77 --- /dev/null +++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesToList.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jclouds.openstack.marconi.v1.functions; + +import com.google.common.base.Function; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import org.jclouds.http.HttpResponse; +import org.jclouds.http.functions.ParseJson; +import org.jclouds.openstack.marconi.v1.domain.Message; + +import javax.inject.Inject; +import java.util.List; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.collect.Iterables.transform; +import static com.google.common.collect.Lists.newArrayList; +import static org.jclouds.openstack.marconi.v1.functions.ParseMessagesToStream.MessageWithHref; +import static org.jclouds.openstack.marconi.v1.functions.ParseMessagesToStream.TO_MESSAGE; + +/** + * @author Everett Toews + */ +public class ParseMessagesToList implements Function<HttpResponse, List<Message>> { + + private final ParseJson<List<MessageWithHref>> json; + + @Inject + ParseMessagesToList(ParseJson<List<MessageWithHref>> json) { + this.json = checkNotNull(json, "json"); + } + + @Override + public List<Message> apply(HttpResponse response) { + // An empty message stream has a 204 response code + if (response.getStatusCode() == 204) { + return ImmutableList.of(); + } + + List<MessageWithHref> messagesWithHref = json.apply(response); + return Lists.newArrayList(transform(messagesWithHref, TO_MESSAGE)); + } +} http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/3e54454a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesToStream.java ---------------------------------------------------------------------- diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesToStream.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesToStream.java new file mode 100644 index 0000000..76a9ba7 --- /dev/null +++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseMessagesToStream.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jclouds.openstack.marconi.v1.functions; + +import com.google.common.base.Function; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import org.jclouds.http.HttpResponse; +import org.jclouds.http.functions.ParseJson; +import org.jclouds.openstack.marconi.v1.domain.Message; +import org.jclouds.openstack.marconi.v1.domain.MessageStream; +import org.jclouds.openstack.v2_0.domain.Link; +import org.jclouds.openstack.v2_0.domain.PaginatedCollection; + +import javax.inject.Inject; +import java.beans.ConstructorProperties; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * @author Everett Toews + */ +public class ParseMessagesToStream implements Function<HttpResponse, MessageStream> { + + private final ParseJson<MessagesWithHref> json; + + @Inject + ParseMessagesToStream(ParseJson<MessagesWithHref> json) { + this.json = checkNotNull(json, "json"); + } + + @Override + public MessageStream apply(HttpResponse response) { + // An empty message stream has a 204 response code + if (response.getStatusCode() == 204) { + return new Messages(ImmutableSet.<Message> of(), ImmutableSet.<Link> of()); + } + + MessagesWithHref messagesWithHref = json.apply(response); + Iterable<Message> messages = Iterables.transform(messagesWithHref, TO_MESSAGE); + + return new Messages(messages, messagesWithHref.getLinks()); + } + + private static String getMessageId(String rawMessageHref) { + // strip off everything but the message id + return rawMessageHref.substring(rawMessageHref.lastIndexOf('/')+1); + } + + protected static final Function<MessageWithHref, Message> TO_MESSAGE = new Function<MessageWithHref, Message>() { + @Override + public Message apply(MessageWithHref messageWithHref) { + return messageWithHref.toBuilder().id(getMessageId(messageWithHref.getId())).build(); + } + }; + + protected static final Function<String, String> TO_MESSAGE_ID = new Function<String, String>() { + @Override + public String apply(String messageIdWithHref) { + return getMessageId(messageIdWithHref); + } + }; + + private static class Messages extends MessageStream { + + @ConstructorProperties({ "messages", "links" }) + protected Messages(Iterable<Message> messages, Iterable<Link> links) { + super(messages, links); + } + } + + private static class MessagesWithHref extends PaginatedCollection<MessageWithHref> { + + @ConstructorProperties({ "messages", "links" }) + protected MessagesWithHref(Iterable<MessageWithHref> messagesWithHref, Iterable<Link> links) { + super(messagesWithHref, links); + } + } + + protected static class MessageWithHref extends Message { + + @ConstructorProperties({ "href", "ttl", "body", "age" }) + protected MessageWithHref(String href, int ttl, String body, int age) { + super(href, ttl, body, age); + } + } +} http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/3e54454a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseQueueStats.java ---------------------------------------------------------------------- diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseQueueStats.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseQueueStats.java index fb87c6c..e6f0ee2 100644 --- a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseQueueStats.java +++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/functions/ParseQueueStats.java @@ -25,7 +25,7 @@ import org.jclouds.openstack.marconi.v1.domain.MessagesStats; import org.jclouds.openstack.marconi.v1.domain.QueueStats; import static com.google.common.base.Preconditions.checkNotNull; -import static org.jclouds.openstack.marconi.v1.functions.ParseMessages.TO_MESSAGE_ID; +import static org.jclouds.openstack.marconi.v1.functions.ParseMessagesToStream.TO_MESSAGE_ID; /** * This parses the stats of a queue. http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/3e54454a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/ListQueuesOptions.java ---------------------------------------------------------------------- diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/ListQueuesOptions.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/ListQueuesOptions.java index 1ce60d4..929529d 100644 --- a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/ListQueuesOptions.java +++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/ListQueuesOptions.java @@ -67,7 +67,7 @@ public class ListQueuesOptions extends PaginationOptions { } /** - * @return The String representation of the marker for these StreamOptions. + * @return The String representation of the marker for these StreamMessagesOptions. */ public String getMarker() { return Iterables.getOnlyElement(queryParameters.get("marker")); http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/3e54454a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/StreamMessagesOptions.java ---------------------------------------------------------------------- diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/StreamMessagesOptions.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/StreamMessagesOptions.java new file mode 100644 index 0000000..b0ff396 --- /dev/null +++ b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/StreamMessagesOptions.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.jclouds.openstack.marconi.v1.options; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Multimap; +import org.jclouds.openstack.v2_0.options.PaginationOptions; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * Options used to control the messages returned in the response. + */ +public class StreamMessagesOptions extends PaginationOptions { + + public static final StreamMessagesOptions NONE = new StreamMessagesOptions(); + + /** + * {@inheritDoc} + */ + @Override + public StreamMessagesOptions queryParameters(Multimap<String, String> queryParams) { + checkNotNull(queryParams, "queryParams"); + queryParameters.putAll(queryParams); + return this; + } + + /** + * @see Builder#marker(String) + */ + @Override + public StreamMessagesOptions marker(String marker) { + super.marker(marker); + return this; + } + + /** + * @see Builder#limit(int) + */ + @Override + public StreamMessagesOptions limit(int limit) { + super.limit(limit); + return this; + + } + + /** + * @see Builder#echo(boolean) + */ + public StreamMessagesOptions echo(boolean echo) { + queryParameters.put("echo", Boolean.toString(echo)); + return this; + } + + /** + * @return The String representation of the marker for these StreamMessagesOptions. + */ + public String getMarker() { + return Iterables.getOnlyElement(queryParameters.get("marker")); + } + + public static class Builder { + /** + * @see PaginationOptions#queryParameters(Multimap) + */ + public static StreamMessagesOptions queryParameters(Multimap<String, String> queryParams) { + StreamMessagesOptions options = new StreamMessagesOptions(); + return options.queryParameters(queryParams); + } + + /** + * Specifies an opaque string that the client can use to request the next batch of messages. The marker parameter + * communicates to the server which messages the client has already received. If you do not specify a value, the + * API returns all messages at the head of the queue (up to the limit). + * </p> + * Clients should make no assumptions about the format or length of the marker. Furthermore, clients should assume + * that there is no relationship between markers and message IDs. + */ + public static StreamMessagesOptions marker(String marker) { + StreamMessagesOptions options = new StreamMessagesOptions(); + return options.marker(marker); + } + + /** + * When more messages are available than can be returned in a single request, the client can pick up the next + * batch of messages by simply using the {@see StremOptions} returned from the previous call in {@code + * MessageStream#nextStreamOptions()}. Specifies up to 10 messages (the default value) to return. If you do not + * specify a value for the limit parameter, the default value of 10 is used. + */ + public static StreamMessagesOptions limit(int limit) { + StreamMessagesOptions options = new StreamMessagesOptions(); + return options.limit(limit); + } + + /** + * The echo parameter determines whether the API returns a client's own messages, as determined by the clientId + * (UUID) portion of the client. If you do not specify a value, echo uses the default value of false. If you are + * experimenting with the API, you might want to set echo=true in order to see the messages that you posted. + */ + public static StreamMessagesOptions echo(boolean echo) { + StreamMessagesOptions options = new StreamMessagesOptions(); + return options.echo(echo); + } + } +} http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/3e54454a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/StreamOptions.java ---------------------------------------------------------------------- diff --git a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/StreamOptions.java b/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/StreamOptions.java deleted file mode 100644 index be2fffb..0000000 --- a/openstack-marconi/src/main/java/org/jclouds/openstack/marconi/v1/options/StreamOptions.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.jclouds.openstack.marconi.v1.options; - -import com.google.common.collect.Iterables; -import com.google.common.collect.Multimap; -import org.jclouds.openstack.v2_0.options.PaginationOptions; - -import static com.google.common.base.Preconditions.checkNotNull; - -/** - * Options used to control the messages returned in the response. - */ -public class StreamOptions extends PaginationOptions { - - public static final StreamOptions NONE = new StreamOptions(); - - /** - * {@inheritDoc} - */ - @Override - public StreamOptions queryParameters(Multimap<String, String> queryParams) { - checkNotNull(queryParams, "queryParams"); - queryParameters.putAll(queryParams); - return this; - } - - /** - * @see Builder#marker(String) - */ - @Override - public StreamOptions marker(String marker) { - super.marker(marker); - return this; - } - - /** - * @see Builder#limit(int) - */ - @Override - public StreamOptions limit(int limit) { - super.limit(limit); - return this; - - } - - /** - * @see Builder#echo(boolean) - */ - public StreamOptions echo(boolean echo) { - queryParameters.put("echo", Boolean.toString(echo)); - return this; - } - - /** - * @return The String representation of the marker for these StreamOptions. - */ - public String getMarker() { - return Iterables.getOnlyElement(queryParameters.get("marker")); - } - - public static class Builder { - /** - * @see PaginationOptions#queryParameters(Multimap) - */ - public static StreamOptions queryParameters(Multimap<String, String> queryParams) { - StreamOptions options = new StreamOptions(); - return options.queryParameters(queryParams); - } - - /** - * Specifies an opaque string that the client can use to request the next batch of messages. The marker parameter - * communicates to the server which messages the client has already received. If you do not specify a value, the - * API returns all messages at the head of the queue (up to the limit). - * </p> - * Clients should make no assumptions about the format or length of the marker. Furthermore, clients should assume - * that there is no relationship between markers and message IDs. - */ - public static StreamOptions marker(String marker) { - StreamOptions options = new StreamOptions(); - return options.marker(marker); - } - - /** - * When more messages are available than can be returned in a single request, the client can pick up the next - * batch of messages by simply using the {@see StremOptions} returned from the previous call in {@code - * MessageStream#nextStreamOptions()}. Specifies up to 10 messages (the default value) to return. If you do not - * specify a value for the limit parameter, the default value of 10 is used. - */ - public static StreamOptions limit(int limit) { - StreamOptions options = new StreamOptions(); - return options.limit(limit); - } - - /** - * The echo parameter determines whether the API returns a client's own messages, as determined by the clientId - * (UUID) portion of the client. If you do not specify a value, echo uses the default value of false. If you are - * experimenting with the API, you might want to set echo=true in order to see the messages that you posted. - */ - public static StreamOptions echo(boolean echo) { - StreamOptions options = new StreamOptions(); - return options.echo(echo); - } - } -} http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/3e54454a/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiLiveTest.java ---------------------------------------------------------------------- diff --git a/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiLiveTest.java b/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiLiveTest.java index 3bfe3e0..279ba20 100644 --- a/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiLiveTest.java +++ b/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiLiveTest.java @@ -18,16 +18,20 @@ package org.jclouds.openstack.marconi.v1.features; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; +import com.google.common.collect.Maps; import org.jclouds.openstack.marconi.v1.domain.CreateMessage; +import org.jclouds.openstack.marconi.v1.domain.Message; import org.jclouds.openstack.marconi.v1.domain.MessageStream; import org.jclouds.openstack.marconi.v1.domain.MessagesCreated; import org.jclouds.openstack.marconi.v1.internal.BaseMarconiApiLiveTest; import org.testng.annotations.Test; +import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.UUID; -import static org.jclouds.openstack.marconi.v1.options.StreamOptions.Builder.echo; +import static org.jclouds.openstack.marconi.v1.options.StreamMessagesOptions.Builder.echo; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; @@ -36,6 +40,8 @@ import static org.testng.Assert.assertTrue; @Test(groups = "live", testName = "MessageApiLiveTest", singleThreaded = true) public class MessageApiLiveTest extends BaseMarconiApiLiveTest { + private final Map<String, List<String>> messageIds = Maps.newHashMap(); + public void createQueues() throws Exception { for (String zoneId : api.getConfiguredZones()) { QueueApi queueApi = api.getQueueApiForZone(zoneId); @@ -49,7 +55,6 @@ public class MessageApiLiveTest extends BaseMarconiApiLiveTest { public void streamZeroPagesOfMessages() throws Exception { for (String zoneId : api.getConfiguredZones()) { MessageApi messageApi = api.getMessageApiForZoneAndQueue(zoneId, "jclouds-test"); - UUID clientId = UUID.fromString("3381af92-2b9e-11e3-b191-71861300734c"); MessageStream messageStream = messageApi.stream(clientId, echo(true)); @@ -80,7 +85,6 @@ public class MessageApiLiveTest extends BaseMarconiApiLiveTest { public void streamOnePageOfMessages() throws Exception { for (String zoneId : api.getConfiguredZones()) { MessageApi messageApi = api.getMessageApiForZoneAndQueue(zoneId, "jclouds-test"); - UUID clientId = UUID.fromString("3381af92-2b9e-11e3-b191-71861300734c"); MessageStream messageStream = messageApi.stream(clientId, echo(true)); @@ -120,14 +124,18 @@ public class MessageApiLiveTest extends BaseMarconiApiLiveTest { public void streamManyPagesOfMessages() throws Exception { for (String zoneId : api.getConfiguredZones()) { MessageApi messageApi = api.getMessageApiForZoneAndQueue(zoneId, "jclouds-test"); - UUID clientId = UUID.fromString("3381af92-2b9e-11e3-b191-71861300734c"); + messageIds.put(zoneId, new ArrayList<String>()); MessageStream messageStream = messageApi.stream(clientId, echo(true).limit(2)); while(messageStream.nextMarker().isPresent()) { assertEquals(Iterables.size(messageStream), 2); + for (Message message: messageStream) { + messageIds.get(zoneId).add(message.getId()); + } + messageStream = messageApi.stream(clientId, messageStream.nextStreamOptions()); } @@ -136,6 +144,23 @@ public class MessageApiLiveTest extends BaseMarconiApiLiveTest { } @Test(dependsOnMethods = { "streamManyPagesOfMessages" }) + public void listMessagesByIds() throws Exception { + for (String zoneId : api.getConfiguredZones()) { + MessageApi messageApi = api.getMessageApiForZoneAndQueue(zoneId, "jclouds-test"); + UUID clientId = UUID.fromString("3381af92-2b9e-11e3-b191-71861300734c"); + + List<Message> messages = messageApi.list(clientId, messageIds.get(zoneId)); + + assertEquals(messages.size(), 4); + + for (Message message: messages) { + assertNotNull(message.getId()); + assertNotNull(message.getBody()); + } + } + } + + @Test(dependsOnMethods = { "listMessagesByIds" }) public void delete() throws Exception { for (String zoneId : api.getConfiguredZones()) { QueueApi queueApi = api.getQueueApiForZone(zoneId); http://git-wip-us.apache.org/repos/asf/jclouds-labs-openstack/blob/3e54454a/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiMockTest.java ---------------------------------------------------------------------- diff --git a/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiMockTest.java b/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiMockTest.java index 8f45aa3..8adb244 100644 --- a/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiMockTest.java +++ b/openstack-marconi/src/test/java/org/jclouds/openstack/marconi/v1/features/MessageApiMockTest.java @@ -22,6 +22,7 @@ import com.squareup.okhttp.mockwebserver.MockResponse; import com.squareup.okhttp.mockwebserver.MockWebServer; import org.jclouds.openstack.marconi.v1.MarconiApi; import org.jclouds.openstack.marconi.v1.domain.CreateMessage; +import org.jclouds.openstack.marconi.v1.domain.Message; import org.jclouds.openstack.marconi.v1.domain.MessageStream; import org.jclouds.openstack.marconi.v1.domain.MessagesCreated; import org.jclouds.openstack.v2_0.internal.BaseOpenStackMockTest; @@ -30,7 +31,7 @@ import org.testng.annotations.Test; import java.util.List; import java.util.UUID; -import static org.jclouds.openstack.marconi.v1.options.StreamOptions.Builder.limit; +import static org.jclouds.openstack.marconi.v1.options.StreamMessagesOptions.Builder.limit; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; @@ -195,4 +196,35 @@ public class MessageApiMockTest extends BaseOpenStackMockTest<MarconiApi> { server.shutdown(); } } + + public void listMessagesByIds() throws Exception { + MockWebServer server = mockOpenStackServer(); + server.enqueue(new MockResponse().setBody(accessRackspace)); + server.enqueue(new MockResponse().setResponseCode(200).setBody("[{\"body\": \"{\\\"event\\\":{\\\"name\\\":\\\"SF Java User Group\\\",\\\"attendees\\\":[\\\"bob\\\",\\\"jim\\\",\\\"sally\\\"]}}\", \"age\": 1596, \"href\": \"/v1/queues/jclouds-test/messages/messages/52928896b04a584f24883227\", \"ttl\": 86400}, {\"body\": \"{\\\"event\\\":{\\\"name\\\":\\\"Austin Java User Group\\\",\\\"attendees\\\":[\\\"bob\\\",\\\"jim\\\",\\\"sally\\\"]}}\", \"age\": 1596, \"href\": \"/v1/queues/jclouds-test/messages/messages/52928896b04a584f24883228\", \"ttl\": 86400}, {\"body\": \"{\\\"event\\\":{\\\"name\\\":\\\"HK Java User Group\\\",\\\"attendees\\\":[\\\"bob\\\",\\\"jim\\\",\\\"sally\\\"]}}\", \"age\": 1596, \"href\": \"/v1/queues/jclouds-test/messages/messages/52928896b04a584f24883229\", \"ttl\": 86400}]")); + + try { + MarconiApi api = api(server.getUrl("/").toString(), "openstack-marconi"); + MessageApi messageApi = api.getMessageApiForZoneAndQueue("DFW", "jclouds-test"); + UUID clientId = UUID.fromString("3381af92-2b9e-11e3-b191-71861300734c"); + List<String> ids = ImmutableList.of("52928896b04a584f24883227", "52928896b04a584f24883228", "52928896b04a584f24883229"); + + List<Message> messages = messageApi.list(clientId, ids); + + assertEquals(messages.size(), 3); + + for (Message message: messages) { + assertNotNull(message.getId()); + assertNotNull(message.getBody()); + assertEquals(message.getAge(), 1596); + assertEquals(message.getTTL(), 86400); + } + + assertEquals(server.getRequestCount(), 2); + assertEquals(server.takeRequest().getRequestLine(), "POST /tokens HTTP/1.1"); + assertEquals(server.takeRequest().getRequestLine(), "GET /v1/123123/queues/jclouds-test/messages?ids=52928896b04a584f24883227,52928896b04a584f24883228,52928896b04a584f24883229 HTTP/1.1"); + } + finally { + server.shutdown(); + } + } }
