[
https://issues.apache.org/jira/browse/BEAM-4570?focusedWorklogId=113615&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-113615
]
ASF GitHub Bot logged work on BEAM-4570:
----------------------------------------
Author: ASF GitHub Bot
Created on: 20/Jun/18 07:26
Start Date: 20/Jun/18 07:26
Worklog Time Spent: 10m
Work Description: iemejia closed pull request #5664: [BEAM-4570] Take
pagination into account on PubsubJsonClient list methods
URL: https://github.com/apache/beam/pull/5664
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
index 198178773f6..2ff828e4122 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java
@@ -23,6 +23,8 @@
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.pubsub.Pubsub.Builder;
+import com.google.api.services.pubsub.Pubsub.Projects.Subscriptions;
+import com.google.api.services.pubsub.Pubsub.Projects.Topics;
import com.google.api.services.pubsub.model.AcknowledgeRequest;
import com.google.api.services.pubsub.model.ListSubscriptionsResponse;
import com.google.api.services.pubsub.model.ListTopicsResponse;
@@ -251,16 +253,21 @@ public void deleteTopic(TopicPath topic) throws
IOException {
@Override
public List<TopicPath> listTopics(ProjectPath project) throws IOException {
- ListTopicsResponse response = pubsub.projects()
- .topics()
- .list(project.getPath())
- .execute();
+ Topics.List request = pubsub.projects().topics().list(project.getPath());
+ ListTopicsResponse response = request.execute();
if (response.getTopics() == null || response.getTopics().isEmpty()) {
return ImmutableList.of();
}
List<TopicPath> topics = new ArrayList<>(response.getTopics().size());
- for (Topic topic : response.getTopics()) {
- topics.add(topicPathFromPath(topic.getName()));
+ while (true) {
+ for (Topic topic : response.getTopics()) {
+ topics.add(topicPathFromPath(topic.getName()));
+ }
+ if (Strings.isNullOrEmpty(response.getNextPageToken())) {
+ break;
+ }
+ request.setPageToken(response.getNextPageToken());
+ response = request.execute();
}
return topics;
}
@@ -289,18 +296,23 @@ public void deleteSubscription(SubscriptionPath
subscription) throws IOException
@Override
public List<SubscriptionPath> listSubscriptions(ProjectPath project,
TopicPath topic)
throws IOException {
- ListSubscriptionsResponse response = pubsub.projects()
- .subscriptions()
- .list(project.getPath())
- .execute();
+ Subscriptions.List request =
pubsub.projects().subscriptions().list(project.getPath());
+ ListSubscriptionsResponse response = request.execute();
if (response.getSubscriptions() == null ||
response.getSubscriptions().isEmpty()) {
return ImmutableList.of();
}
List<SubscriptionPath> subscriptions = new
ArrayList<>(response.getSubscriptions().size());
- for (Subscription subscription : response.getSubscriptions()) {
- if (subscription.getTopic().equals(topic.getPath())) {
- subscriptions.add(subscriptionPathFromPath(subscription.getName()));
+ while (true) {
+ for (Subscription subscription : response.getSubscriptions()) {
+ if (subscription.getTopic().equals(topic.getPath())) {
+ subscriptions.add(subscriptionPathFromPath(subscription.getName()));
+ }
+ }
+ if (Strings.isNullOrEmpty(response.getNextPageToken())) {
+ break;
}
+ request.setPageToken(response.getNextPageToken());
+ response = request.execute();
}
return subscriptions;
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
index 2922057a76a..88f7d4f22e2 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java
@@ -19,23 +19,32 @@
package org.apache.beam.sdk.io.gcp.pubsub;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.when;
import com.google.api.services.pubsub.Pubsub;
+import com.google.api.services.pubsub.Pubsub.Projects.Subscriptions;
+import com.google.api.services.pubsub.Pubsub.Projects.Topics;
+import com.google.api.services.pubsub.model.ListSubscriptionsResponse;
+import com.google.api.services.pubsub.model.ListTopicsResponse;
import com.google.api.services.pubsub.model.PublishRequest;
import com.google.api.services.pubsub.model.PublishResponse;
import com.google.api.services.pubsub.model.PubsubMessage;
import com.google.api.services.pubsub.model.PullRequest;
import com.google.api.services.pubsub.model.PullResponse;
import com.google.api.services.pubsub.model.ReceivedMessage;
+import com.google.api.services.pubsub.model.Subscription;
+import com.google.api.services.pubsub.model.Topic;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.IncomingMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.OutgoingMessage;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.ProjectPath;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
import org.junit.After;
@@ -53,6 +62,7 @@
private Pubsub mockPubsub;
private PubsubClient client;
+ private static final ProjectPath PROJECT =
PubsubClient.projectPathFromId("testProject");
private static final TopicPath TOPIC =
PubsubClient.topicPathFromName("testProject", "testTopic");
private static final SubscriptionPath SUBSCRIPTION =
PubsubClient.subscriptionPathFromName("testProject", "testSubscription");
@@ -96,7 +106,7 @@ public void pullOneMessage() throws IOException {
.setAckId(ACK_ID);
PullResponse expectedResponse =
new
PullResponse().setReceivedMessages(ImmutableList.of(expectedReceivedMessage));
- Mockito.when((Object) (mockPubsub.projects()
+ when((Object) (mockPubsub.projects()
.subscriptions()
.pull(expectedSubscription, expectedRequest)
.execute()))
@@ -125,7 +135,7 @@ public void publishOneMessage() throws IOException {
.setMessages(ImmutableList.of(expectedPubsubMessage));
PublishResponse expectedResponse = new PublishResponse()
.setMessageIds(ImmutableList.of(MESSAGE_ID));
- Mockito.when((Object) (mockPubsub.projects()
+ when((Object) (mockPubsub.projects()
.topics()
.publish(expectedTopic, expectedRequest)
.execute()))
@@ -151,7 +161,7 @@ public void
publishOneMessageWithOnlyTimestampAndIdAttributes() throws IOExcepti
.setMessages(ImmutableList.of(expectedPubsubMessage));
PublishResponse expectedResponse = new PublishResponse()
.setMessageIds(ImmutableList.of(MESSAGE_ID));
- Mockito.when((Object) (mockPubsub.projects()
+ when((Object) (mockPubsub.projects()
.topics()
.publish(expectedTopic, expectedRequest)
.execute()))
@@ -179,7 +189,7 @@ public void publishOneMessageWithNoTimestampOrIdAttribute()
throws IOException {
.setMessages(ImmutableList.of(expectedPubsubMessage));
PublishResponse expectedResponse = new PublishResponse()
.setMessageIds(ImmutableList.of(MESSAGE_ID));
- Mockito.when((Object) (mockPubsub.projects()
+ when((Object) (mockPubsub.projects()
.topics()
.publish(expectedTopic, expectedRequest)
.execute()))
@@ -191,4 +201,51 @@ public void
publishOneMessageWithNoTimestampOrIdAttribute() throws IOException {
int n = client.publish(TOPIC, ImmutableList.of(actualMessage));
assertEquals(1, n);
}
+
+ @Test
+ public void listTopics() throws Exception {
+ ListTopicsResponse expectedResponse1 = new ListTopicsResponse();
+ expectedResponse1.setTopics(Collections.singletonList(buildTopic(1)));
+ expectedResponse1.setNextPageToken("AVgJH3Z7aHxiDBs");
+
+ ListTopicsResponse expectedResponse2 = new ListTopicsResponse();
+ expectedResponse2.setTopics(Collections.singletonList(buildTopic(2)));
+
+ Topics.List request =
mockPubsub.projects().topics().list(PROJECT.getPath());
+ when((Object) (request.execute())).thenReturn(expectedResponse1,
expectedResponse2);
+
+ List<TopicPath> topicPaths = client.listTopics(PROJECT);
+ assertEquals(2, topicPaths.size());
+ }
+
+ private static Topic buildTopic(int i) {
+ Topic topic = new Topic();
+ topic.setName(PubsubClient.topicPathFromName(PROJECT.getId(), "Topic" +
i).getPath());
+ return topic;
+ }
+
+ @Test
+ public void listSubscriptions() throws Exception {
+ ListSubscriptionsResponse expectedResponse1 = new
ListSubscriptionsResponse();
+
expectedResponse1.setSubscriptions(Collections.singletonList(buildSubscription(1)));
+ expectedResponse1.setNextPageToken("AVgJH3Z7aHxiDBs");
+
+ ListSubscriptionsResponse expectedResponse2 = new
ListSubscriptionsResponse();
+
expectedResponse2.setSubscriptions(Collections.singletonList(buildSubscription(2)));
+
+ Subscriptions.List request =
mockPubsub.projects().subscriptions().list(PROJECT.getPath());
+ when((Object) (request.execute())).thenReturn(expectedResponse1,
expectedResponse2);
+
+ final TopicPath topic101 = PubsubClient.topicPathFromName("testProject",
"Topic2");
+ List<SubscriptionPath> subscriptionPaths =
client.listSubscriptions(PROJECT, topic101);
+ assertEquals(1, subscriptionPaths.size());
+ }
+
+ private static Subscription buildSubscription(int i) {
+ Subscription subscription = new Subscription();
+ subscription.setName(
+ PubsubClient.subscriptionPathFromName(PROJECT.getId(), "Subscription"
+ i).getPath());
+ subscription.setTopic(PubsubClient.topicPathFromName(PROJECT.getId(),
"Topic" + i).getPath());
+ return subscription;
+ }
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 113615)
Time Spent: 50m (was: 40m)
> PubsubJsonClient listTopics and listSubscriptions give wrong results (ignore
> pagination)
> ----------------------------------------------------------------------------------------
>
> Key: BEAM-4570
> URL: https://issues.apache.org/jira/browse/BEAM-4570
> Project: Beam
> Issue Type: Bug
> Components: io-java-gcp
> Affects Versions: 2.6.0
> Reporter: Ismaël Mejía
> Assignee: Ismaël Mejía
> Priority: Major
> Time Spent: 50m
> Remaining Estimate: 0h
>
> Both listTopics and listSubscriptions do not take into account the
> nextPageToken currently so users with many topics or subscriptions get
> incorrect answers.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)