This is an automated email from the ASF dual-hosted git repository.

damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new db0a4deea2f [IT] Pub/Sub RM should delete subscriptions that were 
created by Beam (#29957)
db0a4deea2f is described below

commit db0a4deea2f22ee725d22cb3619b4967aa2b2606
Author: Bruno Volpato <[email protected]>
AuthorDate: Fri Jan 12 12:57:47 2024 -0500

    [IT] Pub/Sub RM should delete subscriptions that were created by Beam 
(#29957)
    
    * [IT] Pub/Sub RM should delete subscriptions that were created by Beam
    
    * Add unit tests
---
 .../beam/it/gcp/pubsub/PubsubResourceManager.java  | 14 ++++++++++-
 .../it/gcp/pubsub/PubsubResourceManagerTest.java   | 28 +++++++++++++++++++++-
 2 files changed, 40 insertions(+), 2 deletions(-)

diff --git 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/pubsub/PubsubResourceManager.java
 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/pubsub/PubsubResourceManager.java
index 738620c15b7..f947d70efd9 100644
--- 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/pubsub/PubsubResourceManager.java
+++ 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/pubsub/PubsubResourceManager.java
@@ -319,7 +319,19 @@ public final class PubsubResourceManager implements 
ResourceManager {
 
       for (TopicName topic : createdTopics) {
         LOG.info("Deleting topic '{}'", topic);
-        Failsafe.with(retryOnDeadlineExceeded()).run(() -> 
topicAdminClient.deleteTopic(topic));
+        Failsafe.with(retryOnDeadlineExceeded())
+            .run(
+                () -> {
+
+                  // Delete subscriptions that would be orphaned.
+                  for (String topicSub :
+                      
topicAdminClient.listTopicSubscriptions(topic).iterateAll()) {
+                    LOG.info("Deleting subscription '{}'", topicSub);
+                    subscriptionAdminClient.deleteSubscription(topicSub);
+                  }
+
+                  topicAdminClient.deleteTopic(topic);
+                });
       }
 
       for (SchemaName schemaName : createdSchemas) {
diff --git 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/pubsub/PubsubResourceManagerTest.java
 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/pubsub/PubsubResourceManagerTest.java
index d531862d960..08b480895ff 100644
--- 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/pubsub/PubsubResourceManagerTest.java
+++ 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/pubsub/PubsubResourceManagerTest.java
@@ -37,6 +37,8 @@ import com.google.pubsub.v1.SubscriptionName;
 import com.google.pubsub.v1.Topic;
 import com.google.pubsub.v1.TopicName;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Map;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import org.junit.Before;
@@ -63,8 +65,11 @@ public final class PubsubResourceManagerTest {
   private static final String VALID_MESSAGE_ID = "abcdef";
 
   @Mock private TopicAdminClient topicAdminClient;
-  @Mock private SubscriptionAdminClient subscriptionAdminClient;
 
+  @Mock
+  private TopicAdminClient.ListTopicSubscriptionsPagedResponse 
listTopicSubscriptionsPagedResponse;
+
+  @Mock private SubscriptionAdminClient subscriptionAdminClient;
   @Mock private SchemaServiceClient schemaServiceClient;
   private Topic topic;
   private Subscription subscription;
@@ -75,6 +80,7 @@ public final class PubsubResourceManagerTest {
 
   @Captor private ArgumentCaptor<TopicName> topicNameCaptor;
   @Captor private ArgumentCaptor<SubscriptionName> subscriptionNameCaptor;
+  @Captor private ArgumentCaptor<String> stringArgumentCaptor;
   @Captor private ArgumentCaptor<PubsubMessage> pubsubMessageCaptor;
 
   @Before
@@ -95,6 +101,8 @@ public final class PubsubResourceManagerTest {
             .setName(SubscriptionName.of(PROJECT_ID, 
SUBSCRIPTION_NAME).toString())
             .build();
     when(publisherFactory.createPublisher(any())).thenReturn(publisher);
+    when(topicAdminClient.listTopicSubscriptions(any(TopicName.class)))
+        .thenReturn(listTopicSubscriptionsPagedResponse);
   }
 
   @Test
@@ -225,6 +233,7 @@ public final class PubsubResourceManagerTest {
     Topic topic1 = Topic.newBuilder().setName(topicName1.toString()).build();
     Topic topic2 = Topic.newBuilder().setName(topicName2.toString()).build();
     
when(topicAdminClient.createTopic(any(TopicName.class))).thenReturn(topic1, 
topic2);
+    when(listTopicSubscriptionsPagedResponse.iterateAll()).thenReturn(new 
ArrayList<>());
 
     testManager.createTopic("topic1");
     testManager.createTopic("topic2");
@@ -235,6 +244,23 @@ public final class PubsubResourceManagerTest {
     assertThat(topicNameCaptor.getAllValues()).containsExactly(topicName1, 
topicName2);
   }
 
+  @Test
+  public void testCleanupTopicsShouldDeleteSubscriptions() {
+    TopicName topicName1 = testManager.getTopicName("topic1");
+    Topic topic1 = Topic.newBuilder().setName(topicName1.toString()).build();
+    
when(topicAdminClient.createTopic(any(TopicName.class))).thenReturn(topic1);
+    when(listTopicSubscriptionsPagedResponse.iterateAll())
+        .thenReturn(Arrays.asList("topic1-generated-sub"));
+
+    testManager.createTopic("topic1");
+    testManager.cleanupAll();
+
+    verify(topicAdminClient, times(1)).deleteTopic(topicNameCaptor.capture());
+    assertThat(topicNameCaptor.getAllValues()).containsExactly(topicName1);
+    verify(subscriptionAdminClient, 
times(1)).deleteSubscription(stringArgumentCaptor.capture());
+    
assertThat(stringArgumentCaptor.getAllValues().get(0)).contains("topic1-generated-sub");
+  }
+
   @Test
   public void testCleanupSubscriptionsShouldDeleteResources() {
     SubscriptionName subscriptionName1 = 
testManager.getSubscriptionName("topic1-sub0");

Reply via email to