This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e9ac40  [Tests] Fix flaky tests and memory leak related to 
ContainerAllocator tests (#1556)
8e9ac40 is described below

commit 8e9ac406de8af0ee06ea4636386f5fa2b96d4125
Author: Cameron Lee <[email protected]>
AuthorDate: Wed Nov 10 11:08:09 2021 -0800

    [Tests] Fix flaky tests and memory leak related to ContainerAllocator tests 
(#1556)
---
 build.gradle                                               |  2 +-
 .../TestContainerAllocatorWithHostAffinity.java            | 14 +++++++-------
 .../TestContainerAllocatorWithoutHostAffinity.java         |  2 +-
 .../samza/clustermanager/TestContainerProcessManager.java  |  5 +++++
 4 files changed, 14 insertions(+), 9 deletions(-)

diff --git a/build.gradle b/build.gradle
index ec1d182..2c4b67d 100644
--- a/build.gradle
+++ b/build.gradle
@@ -206,7 +206,7 @@ project(":samza-core_$scalaSuffix") {
 
   test {
     // some unit tests use embedded zookeeper, so adding some extra memory for 
those
-    maxHeapSize = "3072m"
+    maxHeapSize = "1560m"
     jvmArgs = ["-XX:+UseConcMarkSweepGC", "-server"]
   }
 }
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
index 2c9ba81..2c79bb1 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
@@ -410,7 +410,7 @@ public class TestContainerAllocatorWithHostAffinity {
     // State check when host affinity is enabled
     assertTrue(state.matchedResourceRequests.get() == 2);
     assertTrue(state.preferredHostRequests.get() == 2);
-    containerAllocator.stop();
+    spyAllocator.stop();
   }
 
   @Test
@@ -433,7 +433,7 @@ public class TestContainerAllocatorWithHostAffinity {
     spyAllocatorThread.start();
 
     // Let the request expire, expiration timeout is 3 ms
-    Thread.sleep(100);
+    Thread.sleep(1000);
 
     // Verify that all the request that were created as preferred host 
requests expired
     assertTrue(state.preferredHostRequests.get() == 2);
@@ -453,7 +453,7 @@ public class TestContainerAllocatorWithHostAffinity {
     // Check that atleast 2 ANY_HOST requests were made
     assertTrue(state.matchedResourceRequests.get() == 0);
     assertTrue(state.anyHostRequests.get() > 2);
-    containerAllocator.stop();
+    spyAllocator.stop();
   }
 
   @Test
@@ -481,7 +481,7 @@ public class TestContainerAllocatorWithHostAffinity {
     spyAllocatorThread.start();
 
     // Let the request expire, expiration timeout is 3 ms
-    Thread.sleep(100);
+    Thread.sleep(1000);
 
     // Verify that all the request that were created as preferred host 
requests expired
     assertEquals(state.expiredPreferredHostRequests.get(), 2);
@@ -506,7 +506,7 @@ public class TestContainerAllocatorWithHostAffinity {
     assertTrue(state.matchedResourceRequests.get() == 0);
     assertTrue(state.preferredHostRequests.get() == 2);
     assertTrue(state.anyHostRequests.get() == 0);
-    containerAllocator.stop();
+    spyAllocator.stop();
   }
 
   @Test(timeout = 5000)
@@ -551,7 +551,7 @@ public class TestContainerAllocatorWithHostAffinity {
         .forEach(resource -> assertEquals(resource.getHost(), "host-0"));
     // Verify resources were released
     
assertTrue(mockClusterResourceManager.containsReleasedResource(expiredAllocatedResource));
-    containerAllocator.stop();
+    spyAllocator.stop();
   }
 
   //@Test
@@ -643,7 +643,7 @@ public class TestContainerAllocatorWithHostAffinity {
         put("cluster-manager.container.count", "1");
         put("cluster-manager.container.retry.count", "1");
         put("cluster-manager.container.retry.window.ms", "1999999999");
-        put("cluster-manager.container.request.timeout.ms", "3");
+        put("cluster-manager.container.request.timeout.ms", "500");
         put("cluster-manager.allocator.sleep.ms", "1");
         put("cluster-manager.container.memory.mb", "512");
         put("yarn.package.path", "/foo");
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
index 1f063d7..6f8edd4 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
@@ -95,7 +95,7 @@ public class TestContainerAllocatorWithoutHostAffinity {
   @After
   public void teardown() throws Exception {
     jobModelManager.stop();
-    validateMockitoUsage();
+    containerAllocator.stop();
   }
 
   private static Config getConfig() {
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
index ad45c5e..29600b5 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -394,6 +394,7 @@ public class TestContainerProcessManager {
     cpm.onResourceCompleted(new SamzaResourceStatus("id0", "diagnostics", 
SamzaResourceStatus.SUCCESS));
     verify(cpm, 
never()).onResourceCompletedWithUnknownStatus(any(SamzaResourceStatus.class), 
anyString(), anyString(), anyInt());
     assertTrue(cpm.shouldShutdown());
+    cpm.stop();
   }
 
 
@@ -748,6 +749,7 @@ public class TestContainerProcessManager {
     assertFalse(cpm.shouldShutdown());
     assertTrue(state.jobHealthy.get());
     assertEquals(state.redundantNotifications.get(), 1);
+    cpm.stop();
   }
 
   @Test
@@ -781,6 +783,7 @@ public class TestContainerProcessManager {
     manager.onStreamProcessorLaunchFailure(resource, new Exception("cannot 
launch container!"));
     Assert.assertEquals(clusterResourceManager.resourceRequests.size(), 2);
     
Assert.assertEquals(clusterResourceManager.resourceRequests.get(1).getHost(), 
ResourceRequestState.ANY_HOST);
+    manager.stop();
   }
 
   @Test
@@ -852,6 +855,7 @@ public class TestContainerProcessManager {
     cpm.onStreamProcessorLaunchSuccess(resource3);
 
     assertTrue(state.jobHealthy.get());
+    cpm.stop();
   }
 
   @Test
@@ -923,6 +927,7 @@ public class TestContainerProcessManager {
     assertEquals(2, clusterResourceManager.resourceRequests.size());
     assertEquals(0, clusterResourceManager.releasedResources.size());
     assertTrue(state.jobHealthy.get());
+    cpm.stop();
   }
 
   /**

Reply via email to