This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch 1.5.0
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/1.5.0 by this push:
new 9f769aa SAMZA-2544: Adding cleanup for Container Placement tests
using threads, coordinator stream stores
9f769aa is described below
commit 9f769aaac6e2b65dc602cac40fc4f65571291101
Author: Sanil15 <[email protected]>
AuthorDate: Mon Jun 8 14:32:01 2020 -0700
SAMZA-2544: Adding cleanup for Container Placement tests using threads,
coordinator stream stores
**Improvement** [Bug fix]: Adding teardown / cleanup to Container
placements integration tests
**API changes:** None
**Symptoms:**: Creates OOMs when test suite is executed
```
Exception: java.lang.OutOfMemoryError thrown from the
UncaughtExceptionHandler in thread "RMI RenewClean-[]"
Exception: java.lang.OutOfMemoryError thrown from the
UncaughtExceptionHandler in thread "Container Allocator Thread"
Exception: java.lang.OutOfMemoryError thrown from the
UncaughtExceptionHandler in thread "RMI RenewClean-[]"
Exception: java.lang.OutOfMemoryError thrown from the
UncaughtExceptionHandler in thread "ContainerPlacement Request Allocator Thread"
Exception: java.lang.OutOfMemoryError thrown from the
UncaughtExceptionHandler in thread "Container Allocator Thread"
Exception: java.lang.OutOfMemoryError thrown from the
UncaughtExceptionHandler in thread "Container Allocator Thread"
Exception: java.lang.OutOfMemoryError thrown from the
UncaughtExceptionHandler in thread "Container Allocator Thread"
```
**Upgrade Instructions:** None
**Usage Instructions:** None
Author: Sanil15 <[email protected]>
Reviewers: mynameborat <[email protected]>
Closes #1378 from Sanil15/oom-fix
(cherry picked from commit 8c753eb4ded7a7239f33042da903aa5fcc47dc75)
Signed-off-by: mynameborat <[email protected]>
---
.../TestContainerPlacementActions.java | 36 ++++++++++++++++++----
1 file changed, 30 insertions(+), 6 deletions(-)
diff --git
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
index 18e6cb4..87bf85e 100644
---
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
+++
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
@@ -51,6 +51,7 @@ import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.testUtils.MockHttpServer;
import org.eclipse.jetty.servlet.DefaultServlet;
import org.eclipse.jetty.servlet.ServletHolder;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -164,6 +165,13 @@ public class TestContainerPlacementActions {
clusterResourceManager, Optional.of(allocatorWithHostAffinity),
containerManager);
}
+ @After
+ public void teardown() {
+ containerPlacementMetadataStore.stop();
+ cpm.stop();
+ coordinatorStreamStore.close();
+ }
+
public void setupStandby() throws Exception {
state = new
SamzaApplicationState(getJobModelManagerWithHostAffinityWithStandby(ImmutableMap.of("0",
"host-1", "1", "host-2", "0-0", "host-2", "1-0", "host-1")));
callback = mock(ClusterResourceManager.Callback.class);
@@ -266,9 +274,10 @@ public class TestContainerPlacementActions {
@Test(timeout = 30000)
public void testActionQueuingForConsecutivePlacementActions() throws
Exception {
// Spawn a Request Allocator Thread
- Thread requestAllocatorThread = new Thread(
- new
ContainerPlacementRequestAllocator(containerPlacementMetadataStore, cpm, new
ApplicationConfig(config)),
- "ContainerPlacement Request Allocator Thread");
+ ContainerPlacementRequestAllocator requestAllocator =
+ new
ContainerPlacementRequestAllocator(containerPlacementMetadataStore, cpm, new
ApplicationConfig(config));
+ Thread requestAllocatorThread = new Thread(requestAllocator,
"ContainerPlacement Request Allocator Thread");
+
requestAllocatorThread.start();
doAnswer(new Answer<Void>() {
@@ -364,6 +373,9 @@ public class TestContainerPlacementActions {
// Requests from Previous deploy must be cleaned
assertFalse(containerPlacementMetadataStore.readContainerPlacementRequestMessage(requestUUIDMoveBad).isPresent());
assertFalse(containerPlacementMetadataStore.readContainerPlacementResponseMessage(requestUUIDMoveBad).isPresent());
+
+ // Cleanup Request Allocator Thread
+ cleanUpRequestAllocatorThread(requestAllocator, requestAllocatorThread);
}
@Test(timeout = 10000)
@@ -837,9 +849,9 @@ public class TestContainerPlacementActions {
setupStandby();
// Spawn a Request Allocator Thread
- Thread requestAllocatorThread = new Thread(
- new
ContainerPlacementRequestAllocator(containerPlacementMetadataStore, cpm, new
ApplicationConfig(config)),
- "ContainerPlacement Request Allocator Thread");
+ ContainerPlacementRequestAllocator requestAllocator =
+ new
ContainerPlacementRequestAllocator(containerPlacementMetadataStore, cpm, new
ApplicationConfig(config));
+ Thread requestAllocatorThread = new Thread(requestAllocator,
"ContainerPlacement Request Allocator Thread");
requestAllocatorThread.start();
doAnswer(new Answer<Void>() {
@@ -976,6 +988,9 @@ public class TestContainerPlacementActions {
// Request should be deleted as soon as ita accepted / being acted upon
assertFalse(containerPlacementMetadataStore.readContainerPlacementRequestMessage(standbyMoveRequest).isPresent());
assertFalse(containerPlacementMetadataStore.readContainerPlacementRequestMessage(activeMoveRequest).isPresent());
+
+ // Cleanup Request Allocator Thread
+ cleanUpRequestAllocatorThread(requestAllocator, requestAllocatorThread);
}
private void assertResponseMessage(ContainerPlacementResponseMessage
responseMessage,
@@ -1011,4 +1026,13 @@ public class TestContainerPlacementActions {
// Request shall be deleted as soon as it is acted upon
assertFalse(containerPlacementMetadataStore.readContainerPlacementRequestMessage(requestMessage.getUuid()).isPresent());
}
+
+ private void
cleanUpRequestAllocatorThread(ContainerPlacementRequestAllocator
requestAllocator, Thread containerPlacementRequestAllocatorThread) {
+ requestAllocator.stop();
+ try {
+ containerPlacementRequestAllocatorThread.join();
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
}