This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c753eb  SAMZA-2544: Adding cleanup for Container Placement tests 
using threads, coordinator stream stores
8c753eb is described below

commit 8c753eb4ded7a7239f33042da903aa5fcc47dc75
Author: Sanil15 <[email protected]>
AuthorDate: Mon Jun 8 14:32:01 2020 -0700

    SAMZA-2544: Adding cleanup for Container Placement tests using threads, 
coordinator stream stores
    
    **Improvement** [Bug fix]: Adding teardown / cleanup to Container 
placements integration tests
    
    **API changes:** None
    
    **Symptoms:**: Creates OOMs when test suite is executed
    ```
    Exception: java.lang.OutOfMemoryError thrown from the 
UncaughtExceptionHandler in thread "RMI RenewClean-[]"
    Exception: java.lang.OutOfMemoryError thrown from the 
UncaughtExceptionHandler in thread "Container Allocator Thread"
    Exception: java.lang.OutOfMemoryError thrown from the 
UncaughtExceptionHandler in thread "RMI RenewClean-[]"
    Exception: java.lang.OutOfMemoryError thrown from the 
UncaughtExceptionHandler in thread "ContainerPlacement Request Allocator Thread"
    Exception: java.lang.OutOfMemoryError thrown from the 
UncaughtExceptionHandler in thread "Container Allocator Thread"
    Exception: java.lang.OutOfMemoryError thrown from the 
UncaughtExceptionHandler in thread "Container Allocator Thread"
    Exception: java.lang.OutOfMemoryError thrown from the 
UncaughtExceptionHandler in thread "Container Allocator Thread"
    ```
    
    **Upgrade Instructions:** None
    
    **Usage Instructions:** None
    
    Author: Sanil15 <[email protected]>
    
    Reviewers: mynameborat <[email protected]>
    
    Closes #1378 from Sanil15/oom-fix
---
 .../TestContainerPlacementActions.java             | 36 ++++++++++++++++++----
 1 file changed, 30 insertions(+), 6 deletions(-)

diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
index 18e6cb4..87bf85e 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
@@ -51,6 +51,7 @@ import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.testUtils.MockHttpServer;
 import org.eclipse.jetty.servlet.DefaultServlet;
 import org.eclipse.jetty.servlet.ServletHolder;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -164,6 +165,13 @@ public class TestContainerPlacementActions {
             clusterResourceManager, Optional.of(allocatorWithHostAffinity), 
containerManager);
   }
 
+  @After
+  public void teardown() {
+    containerPlacementMetadataStore.stop();
+    cpm.stop();
+    coordinatorStreamStore.close();
+  }
+
   public void setupStandby() throws Exception {
     state = new 
SamzaApplicationState(getJobModelManagerWithHostAffinityWithStandby(ImmutableMap.of("0",
 "host-1", "1", "host-2", "0-0", "host-2", "1-0", "host-1")));
     callback = mock(ClusterResourceManager.Callback.class);
@@ -266,9 +274,10 @@ public class TestContainerPlacementActions {
   @Test(timeout = 30000)
   public void testActionQueuingForConsecutivePlacementActions() throws 
Exception {
     // Spawn a Request Allocator Thread
-    Thread requestAllocatorThread = new Thread(
-        new 
ContainerPlacementRequestAllocator(containerPlacementMetadataStore, cpm, new 
ApplicationConfig(config)),
-        "ContainerPlacement Request Allocator Thread");
+    ContainerPlacementRequestAllocator requestAllocator =
+        new 
ContainerPlacementRequestAllocator(containerPlacementMetadataStore, cpm, new 
ApplicationConfig(config));
+    Thread requestAllocatorThread = new Thread(requestAllocator, 
"ContainerPlacement Request Allocator Thread");
+
     requestAllocatorThread.start();
 
     doAnswer(new Answer<Void>() {
@@ -364,6 +373,9 @@ public class TestContainerPlacementActions {
     // Requests from Previous deploy must be cleaned
     
assertFalse(containerPlacementMetadataStore.readContainerPlacementRequestMessage(requestUUIDMoveBad).isPresent());
     
assertFalse(containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestUUIDMoveBad).isPresent());
+
+    // Cleanup Request Allocator Thread
+    cleanUpRequestAllocatorThread(requestAllocator, requestAllocatorThread);
   }
 
   @Test(timeout = 10000)
@@ -837,9 +849,9 @@ public class TestContainerPlacementActions {
     setupStandby();
 
     // Spawn a Request Allocator Thread
-    Thread requestAllocatorThread = new Thread(
-        new 
ContainerPlacementRequestAllocator(containerPlacementMetadataStore, cpm, new 
ApplicationConfig(config)),
-        "ContainerPlacement Request Allocator Thread");
+    ContainerPlacementRequestAllocator requestAllocator =
+        new 
ContainerPlacementRequestAllocator(containerPlacementMetadataStore, cpm, new 
ApplicationConfig(config));
+    Thread requestAllocatorThread = new Thread(requestAllocator, 
"ContainerPlacement Request Allocator Thread");
     requestAllocatorThread.start();
 
     doAnswer(new Answer<Void>() {
@@ -976,6 +988,9 @@ public class TestContainerPlacementActions {
     // Request should be deleted as soon as ita accepted / being acted upon
     
assertFalse(containerPlacementMetadataStore.readContainerPlacementRequestMessage(standbyMoveRequest).isPresent());
     
assertFalse(containerPlacementMetadataStore.readContainerPlacementRequestMessage(activeMoveRequest).isPresent());
+
+    // Cleanup Request Allocator Thread
+    cleanUpRequestAllocatorThread(requestAllocator, requestAllocatorThread);
   }
 
   private void assertResponseMessage(ContainerPlacementResponseMessage 
responseMessage,
@@ -1011,4 +1026,13 @@ public class TestContainerPlacementActions {
     // Request shall be deleted as soon as it is acted upon
     
assertFalse(containerPlacementMetadataStore.readContainerPlacementRequestMessage(requestMessage.getUuid()).isPresent());
   }
+
+  private void 
cleanUpRequestAllocatorThread(ContainerPlacementRequestAllocator 
requestAllocator, Thread containerPlacementRequestAllocatorThread) {
+    requestAllocator.stop();
+    try {
+      containerPlacementRequestAllocatorThread.join();
+    } catch (InterruptedException ie) {
+      Thread.currentThread().interrupt();
+    }
+  }
 }

Reply via email to