This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch 1.5.0
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/1.5.0 by this push:
new 6fe7efd SAMZA-2504: Improve Container Placement Flaky Test & Running
Time
6fe7efd is described below
commit 6fe7efd7d2e186a266d3c04395d94db001c1f7f8
Author: Sanil15 <[email protected]>
AuthorDate: Mon Jun 8 15:35:03 2020 -0700
SAMZA-2504: Improve Container Placement Flaky Test & Running Time
Improvement [Bug fix]:
- Fix a flaky test for Container Placements on Request status
- Improve the running time of Test suite from 40 secs to under 4 seconds
API changes: None
Upgrade Instructions: None
Usage Instructions: None
Author: Sanil15 <[email protected]>
Reviewers: mynameborat <[email protected]>
Closes #1376 from Sanil15/SAMZA-2504
(cherry picked from commit bcee407801d5966015009e9c1d0337ce7e13fc96)
Signed-off-by: mynameborat <[email protected]>
---
.../ContainerPlacementRequestAllocator.java | 24 ++++++++++++++++++++--
.../TestContainerPlacementActions.java | 22 ++++++++++++--------
2 files changed, 35 insertions(+), 11 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementRequestAllocator.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementRequestAllocator.java
index 5161cfb..7eb6175 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementRequestAllocator.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/container/placement/ContainerPlacementRequestAllocator.java
@@ -18,6 +18,7 @@
*/
package org.apache.samza.clustermanager.container.placement;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.samza.clustermanager.ContainerProcessManager;
import org.apache.samza.config.ApplicationConfig;
@@ -50,7 +51,10 @@ public class ContainerPlacementRequestAllocator implements
Runnable {
* RunId of the app
*/
private final String appRunId;
-
+ /**
+ * Sleep time for container placement handler thread
+ */
+ private final int containerPlacementHandlerSleepMs;
public ContainerPlacementRequestAllocator(ContainerPlacementMetadataStore
containerPlacementMetadataStore, ContainerProcessManager manager,
ApplicationConfig config) {
Preconditions.checkNotNull(containerPlacementMetadataStore,
"containerPlacementMetadataStore cannot be null");
Preconditions.checkNotNull(manager, "ContainerProcessManager cannot be
null");
@@ -58,6 +62,22 @@ public class ContainerPlacementRequestAllocator implements
Runnable {
this.containerPlacementMetadataStore = containerPlacementMetadataStore;
this.isRunning = true;
this.appRunId = config.getRunId();
+ this.containerPlacementHandlerSleepMs =
DEFAULT_CLUSTER_MANAGER_CONTAINER_PLACEMENT_HANDLER_SLEEP_MS;
+ }
+
+ @VisibleForTesting
+ /**
+ * Should only get used for testing, cannot make it package private because
end to end integeration test
+ * need package private methods which live in org.apache.samza.clustermanager
+ */
+ public ContainerPlacementRequestAllocator(ContainerPlacementMetadataStore
containerPlacementMetadataStore, ContainerProcessManager manager,
ApplicationConfig config, int containerPlacementHandlerSleepMs) {
+ Preconditions.checkNotNull(containerPlacementMetadataStore,
"containerPlacementMetadataStore cannot be null");
+ Preconditions.checkNotNull(manager, "ContainerProcessManager cannot be
null");
+ this.containerProcessManager = manager;
+ this.containerPlacementMetadataStore = containerPlacementMetadataStore;
+ this.isRunning = true;
+ this.appRunId = config.getRunId();
+ this.containerPlacementHandlerSleepMs = containerPlacementHandlerSleepMs;
}
@Override
@@ -75,7 +95,7 @@ public class ContainerPlacementRequestAllocator implements
Runnable {
containerPlacementMetadataStore.deleteAllContainerPlacementMessages(message.getUuid());
}
}
-
Thread.sleep(DEFAULT_CLUSTER_MANAGER_CONTAINER_PLACEMENT_HANDLER_SLEEP_MS);
+ Thread.sleep(containerPlacementHandlerSleepMs);
} catch (InterruptedException e) {
LOG.warn("Got InterruptedException in
ContainerPlacementRequestAllocator thread.", e);
Thread.currentThread().interrupt();
diff --git
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
index 87bf85e..49b013d 100644
---
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
+++
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerPlacementActions.java
@@ -64,6 +64,10 @@ import static org.mockito.Mockito.*;
/**
* Set of Integration tests for container placement actions
+ *
+ * Please note that semaphores are used wherever possible, there are some
Thread.sleep used for the main thread to check
+ * on state changes to atomic variables or synchroized metadata objects
because of difficulty of plugging semaphores to
+ * those pieces of logic
*/
@RunWith(MockitoJUnitRunner.class)
public class TestContainerPlacementActions {
@@ -275,7 +279,7 @@ public class TestContainerPlacementActions {
public void testActionQueuingForConsecutivePlacementActions() throws
Exception {
// Spawn a Request Allocator Thread
ContainerPlacementRequestAllocator requestAllocator =
- new
ContainerPlacementRequestAllocator(containerPlacementMetadataStore, cpm, new
ApplicationConfig(config));
+ new
ContainerPlacementRequestAllocator(containerPlacementMetadataStore, cpm, new
ApplicationConfig(config), 100);
Thread requestAllocatorThread = new Thread(requestAllocator,
"ContainerPlacement Request Allocator Thread");
requestAllocatorThread.start();
@@ -345,7 +349,7 @@ public class TestContainerPlacementActions {
== ContainerPlacementMessage.StatusCode.SUCCEEDED) {
break;
}
- Thread.sleep(Duration.ofSeconds(5).toMillis());
+ Thread.sleep(100);
}
assertEquals(state.preferredHostRequests.get(), 4);
@@ -647,8 +651,9 @@ public class TestContainerPlacementActions {
fail("timed out waiting for the containers to start");
}
- // Wait for both the containers to be in running state
- while (state.runningProcessors.size() != 2) {
+ // Wait for both the containers to be in running state & control action
metadata to succeed
+ while (state.runningProcessors.size() != 2
+ && metadata.getActionStatus() !=
ContainerPlacementMessage.StatusCode.SUCCEEDED) {
Thread.sleep(100);
}
@@ -660,8 +665,6 @@ public class TestContainerPlacementActions {
assertEquals(state.anyHostRequests.get(), 0);
// Failed processors must be empty
assertEquals(state.failedProcessors.size(), 0);
- // Control Action should be success in this case
- assertEquals(metadata.getActionStatus(),
ContainerPlacementMessage.StatusCode.SUCCEEDED);
}
@Test(timeout = 10000)
@@ -850,8 +853,9 @@ public class TestContainerPlacementActions {
// Spawn a Request Allocator Thread
ContainerPlacementRequestAllocator requestAllocator =
- new
ContainerPlacementRequestAllocator(containerPlacementMetadataStore, cpm, new
ApplicationConfig(config));
+ new
ContainerPlacementRequestAllocator(containerPlacementMetadataStore, cpm, new
ApplicationConfig(config), 100);
Thread requestAllocatorThread = new Thread(requestAllocator,
"ContainerPlacement Request Allocator Thread");
+
requestAllocatorThread.start();
doAnswer(new Answer<Void>() {
@@ -923,7 +927,7 @@ public class TestContainerPlacementActions {
== ContainerPlacementMessage.StatusCode.BAD_REQUEST) {
break;
}
- Thread.sleep(Duration.ofSeconds(5).toMillis());
+ Thread.sleep(100);
}
// App running state should remain the same
@@ -960,7 +964,7 @@ public class TestContainerPlacementActions {
== ContainerPlacementMessage.StatusCode.SUCCEEDED) {
break;
}
- Thread.sleep(Duration.ofSeconds(5).toMillis());
+ Thread.sleep(100);
}
assertEquals(4, state.runningProcessors.size());