This is an automated email from the ASF dual-hosted git repository.
damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new db0a4deea2f [IT] Pub/Sub RM should delete subscriptions that were
created by Beam (#29957)
db0a4deea2f is described below
commit db0a4deea2f22ee725d22cb3619b4967aa2b2606
Author: Bruno Volpato <[email protected]>
AuthorDate: Fri Jan 12 12:57:47 2024 -0500
[IT] Pub/Sub RM should delete subscriptions that were created by Beam
(#29957)
* [IT] Pub/Sub RM should delete subscriptions that were created by Beam
* Add unit tests
---
.../beam/it/gcp/pubsub/PubsubResourceManager.java | 14 ++++++++++-
.../it/gcp/pubsub/PubsubResourceManagerTest.java | 28 +++++++++++++++++++++-
2 files changed, 40 insertions(+), 2 deletions(-)
diff --git
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/pubsub/PubsubResourceManager.java
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/pubsub/PubsubResourceManager.java
index 738620c15b7..f947d70efd9 100644
---
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/pubsub/PubsubResourceManager.java
+++
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/pubsub/PubsubResourceManager.java
@@ -319,7 +319,19 @@ public final class PubsubResourceManager implements
ResourceManager {
for (TopicName topic : createdTopics) {
LOG.info("Deleting topic '{}'", topic);
- Failsafe.with(retryOnDeadlineExceeded()).run(() ->
topicAdminClient.deleteTopic(topic));
+ Failsafe.with(retryOnDeadlineExceeded())
+ .run(
+ () -> {
+
+ // Delete subscriptions that would be orphaned.
+ for (String topicSub :
+
topicAdminClient.listTopicSubscriptions(topic).iterateAll()) {
+ LOG.info("Deleting subscription '{}'", topicSub);
+ subscriptionAdminClient.deleteSubscription(topicSub);
+ }
+
+ topicAdminClient.deleteTopic(topic);
+ });
}
for (SchemaName schemaName : createdSchemas) {
diff --git
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/pubsub/PubsubResourceManagerTest.java
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/pubsub/PubsubResourceManagerTest.java
index d531862d960..08b480895ff 100644
---
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/pubsub/PubsubResourceManagerTest.java
+++
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/pubsub/PubsubResourceManagerTest.java
@@ -37,6 +37,8 @@ import com.google.pubsub.v1.SubscriptionName;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Map;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.junit.Before;
@@ -63,8 +65,11 @@ public final class PubsubResourceManagerTest {
private static final String VALID_MESSAGE_ID = "abcdef";
@Mock private TopicAdminClient topicAdminClient;
- @Mock private SubscriptionAdminClient subscriptionAdminClient;
+ @Mock
+ private TopicAdminClient.ListTopicSubscriptionsPagedResponse
listTopicSubscriptionsPagedResponse;
+
+ @Mock private SubscriptionAdminClient subscriptionAdminClient;
@Mock private SchemaServiceClient schemaServiceClient;
private Topic topic;
private Subscription subscription;
@@ -75,6 +80,7 @@ public final class PubsubResourceManagerTest {
@Captor private ArgumentCaptor<TopicName> topicNameCaptor;
@Captor private ArgumentCaptor<SubscriptionName> subscriptionNameCaptor;
+ @Captor private ArgumentCaptor<String> stringArgumentCaptor;
@Captor private ArgumentCaptor<PubsubMessage> pubsubMessageCaptor;
@Before
@@ -95,6 +101,8 @@ public final class PubsubResourceManagerTest {
.setName(SubscriptionName.of(PROJECT_ID,
SUBSCRIPTION_NAME).toString())
.build();
when(publisherFactory.createPublisher(any())).thenReturn(publisher);
+ when(topicAdminClient.listTopicSubscriptions(any(TopicName.class)))
+ .thenReturn(listTopicSubscriptionsPagedResponse);
}
@Test
@@ -225,6 +233,7 @@ public final class PubsubResourceManagerTest {
Topic topic1 = Topic.newBuilder().setName(topicName1.toString()).build();
Topic topic2 = Topic.newBuilder().setName(topicName2.toString()).build();
when(topicAdminClient.createTopic(any(TopicName.class))).thenReturn(topic1,
topic2);
+ when(listTopicSubscriptionsPagedResponse.iterateAll()).thenReturn(new
ArrayList<>());
testManager.createTopic("topic1");
testManager.createTopic("topic2");
@@ -235,6 +244,23 @@ public final class PubsubResourceManagerTest {
assertThat(topicNameCaptor.getAllValues()).containsExactly(topicName1,
topicName2);
}
+ @Test
+ public void testCleanupTopicsShouldDeleteSubscriptions() {
+ TopicName topicName1 = testManager.getTopicName("topic1");
+ Topic topic1 = Topic.newBuilder().setName(topicName1.toString()).build();
+
when(topicAdminClient.createTopic(any(TopicName.class))).thenReturn(topic1);
+ when(listTopicSubscriptionsPagedResponse.iterateAll())
+ .thenReturn(Arrays.asList("topic1-generated-sub"));
+
+ testManager.createTopic("topic1");
+ testManager.cleanupAll();
+
+ verify(topicAdminClient, times(1)).deleteTopic(topicNameCaptor.capture());
+ assertThat(topicNameCaptor.getAllValues()).containsExactly(topicName1);
+ verify(subscriptionAdminClient,
times(1)).deleteSubscription(stringArgumentCaptor.capture());
+
assertThat(stringArgumentCaptor.getAllValues().get(0)).contains("topic1-generated-sub");
+ }
+
@Test
public void testCleanupSubscriptionsShouldDeleteResources() {
SubscriptionName subscriptionName1 =
testManager.getSubscriptionName("topic1-sub0");