This is an automated email from the ASF dual-hosted git repository.
cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 8e9ac40 [Tests] Fix flaky tests and memory leak related to
ContainerAllocator tests (#1556)
8e9ac40 is described below
commit 8e9ac406de8af0ee06ea4636386f5fa2b96d4125
Author: Cameron Lee <[email protected]>
AuthorDate: Wed Nov 10 11:08:09 2021 -0800
[Tests] Fix flaky tests and memory leak related to ContainerAllocator tests
(#1556)
---
build.gradle | 2 +-
.../TestContainerAllocatorWithHostAffinity.java | 14 +++++++-------
.../TestContainerAllocatorWithoutHostAffinity.java | 2 +-
.../samza/clustermanager/TestContainerProcessManager.java | 5 +++++
4 files changed, 14 insertions(+), 9 deletions(-)
diff --git a/build.gradle b/build.gradle
index ec1d182..2c4b67d 100644
--- a/build.gradle
+++ b/build.gradle
@@ -206,7 +206,7 @@ project(":samza-core_$scalaSuffix") {
test {
// some unit tests use embedded zookeeper, so adding some extra memory for
those
- maxHeapSize = "3072m"
+ maxHeapSize = "1560m"
jvmArgs = ["-XX:+UseConcMarkSweepGC", "-server"]
}
}
diff --git
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
index 2c9ba81..2c79bb1 100644
---
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
+++
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
@@ -410,7 +410,7 @@ public class TestContainerAllocatorWithHostAffinity {
// State check when host affinity is enabled
assertTrue(state.matchedResourceRequests.get() == 2);
assertTrue(state.preferredHostRequests.get() == 2);
- containerAllocator.stop();
+ spyAllocator.stop();
}
@Test
@@ -433,7 +433,7 @@ public class TestContainerAllocatorWithHostAffinity {
spyAllocatorThread.start();
// Let the request expire, expiration timeout is 3 ms
- Thread.sleep(100);
+ Thread.sleep(1000);
// Verify that all the request that were created as preferred host
requests expired
assertTrue(state.preferredHostRequests.get() == 2);
@@ -453,7 +453,7 @@ public class TestContainerAllocatorWithHostAffinity {
// Check that atleast 2 ANY_HOST requests were made
assertTrue(state.matchedResourceRequests.get() == 0);
assertTrue(state.anyHostRequests.get() > 2);
- containerAllocator.stop();
+ spyAllocator.stop();
}
@Test
@@ -481,7 +481,7 @@ public class TestContainerAllocatorWithHostAffinity {
spyAllocatorThread.start();
// Let the request expire, expiration timeout is 3 ms
- Thread.sleep(100);
+ Thread.sleep(1000);
// Verify that all the request that were created as preferred host
requests expired
assertEquals(state.expiredPreferredHostRequests.get(), 2);
@@ -506,7 +506,7 @@ public class TestContainerAllocatorWithHostAffinity {
assertTrue(state.matchedResourceRequests.get() == 0);
assertTrue(state.preferredHostRequests.get() == 2);
assertTrue(state.anyHostRequests.get() == 0);
- containerAllocator.stop();
+ spyAllocator.stop();
}
@Test(timeout = 5000)
@@ -551,7 +551,7 @@ public class TestContainerAllocatorWithHostAffinity {
.forEach(resource -> assertEquals(resource.getHost(), "host-0"));
// Verify resources were released
assertTrue(mockClusterResourceManager.containsReleasedResource(expiredAllocatedResource));
- containerAllocator.stop();
+ spyAllocator.stop();
}
//@Test
@@ -643,7 +643,7 @@ public class TestContainerAllocatorWithHostAffinity {
put("cluster-manager.container.count", "1");
put("cluster-manager.container.retry.count", "1");
put("cluster-manager.container.retry.window.ms", "1999999999");
- put("cluster-manager.container.request.timeout.ms", "3");
+ put("cluster-manager.container.request.timeout.ms", "500");
put("cluster-manager.allocator.sleep.ms", "1");
put("cluster-manager.container.memory.mb", "512");
put("yarn.package.path", "/foo");
diff --git
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
index 1f063d7..6f8edd4 100644
---
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
+++
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
@@ -95,7 +95,7 @@ public class TestContainerAllocatorWithoutHostAffinity {
@After
public void teardown() throws Exception {
jobModelManager.stop();
- validateMockitoUsage();
+ containerAllocator.stop();
}
private static Config getConfig() {
diff --git
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
index ad45c5e..29600b5 100644
---
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
+++
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -394,6 +394,7 @@ public class TestContainerProcessManager {
cpm.onResourceCompleted(new SamzaResourceStatus("id0", "diagnostics",
SamzaResourceStatus.SUCCESS));
verify(cpm,
never()).onResourceCompletedWithUnknownStatus(any(SamzaResourceStatus.class),
anyString(), anyString(), anyInt());
assertTrue(cpm.shouldShutdown());
+ cpm.stop();
}
@@ -748,6 +749,7 @@ public class TestContainerProcessManager {
assertFalse(cpm.shouldShutdown());
assertTrue(state.jobHealthy.get());
assertEquals(state.redundantNotifications.get(), 1);
+ cpm.stop();
}
@Test
@@ -781,6 +783,7 @@ public class TestContainerProcessManager {
manager.onStreamProcessorLaunchFailure(resource, new Exception("cannot
launch container!"));
Assert.assertEquals(clusterResourceManager.resourceRequests.size(), 2);
Assert.assertEquals(clusterResourceManager.resourceRequests.get(1).getHost(),
ResourceRequestState.ANY_HOST);
+ manager.stop();
}
@Test
@@ -852,6 +855,7 @@ public class TestContainerProcessManager {
cpm.onStreamProcessorLaunchSuccess(resource3);
assertTrue(state.jobHealthy.get());
+ cpm.stop();
}
@Test
@@ -923,6 +927,7 @@ public class TestContainerProcessManager {
assertEquals(2, clusterResourceManager.resourceRequests.size());
assertEquals(0, clusterResourceManager.releasedResources.size());
assertTrue(state.jobHealthy.get());
+ cpm.stop();
}
/**