This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new d99a870 Revert "Samza-2330: Handle expired resource request for
Container allocator when host affinity is disabled"
d99a870 is described below
commit d99a87016d905a05acbae15bce11be38906e2c5e
Author: Sanil Jain <[email protected]>
AuthorDate: Fri Oct 25 11:43:01 2019 -0700
Revert "Samza-2330: Handle expired resource request for Container allocator
when host affinity is disabled"
This reverts commit 2ae34502e3526cbb8e275d87c26c7bc4ce2d8ed4.
---
.../versioned/jobs/samza-configurations.md | 2 +-
.../samza/clustermanager/ContainerAllocator.java | 16 ++++---
.../TestContainerAllocatorWithHostAffinity.java | 10 ++--
.../TestContainerAllocatorWithoutHostAffinity.java | 55 +---------------------
.../TestContainerProcessManager.java | 2 +-
5 files changed, 18 insertions(+), 67 deletions(-)
diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md
b/docs/learn/documentation/versioned/jobs/samza-configurations.md
index ac5004e..8637a4a 100644
--- a/docs/learn/documentation/versioned/jobs/samza-configurations.md
+++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md
@@ -300,7 +300,7 @@ Samza supports both standalone and clustered
([YARN](yarn-jobs.html)) [deploymen
|cluster-manager.container.fail.job.after.retries|true|This configuration sets
the behavior of the job after all `cluster-manager.container.retry.count`s are
exhausted and each retry is within the
`cluster-manager.container.retry.window.ms` period on any single container. If
set to true, the whole job will fail if any container fails after the last
retry. If set to false, the job will continue to run without the failed
container. The typical use cases of setting this to false is to aid i [...]
|cluster-manager.jobcoordinator.jmx.enabled|true|This is deprecated in favor
of `job.jmx.enabled`|
|cluster-manager.allocator.sleep.ms|3600|The container allocator thread is
responsible for matching requests to allocated containers. The sleep interval
for this thread is configured using this property.|
-|cluster-manager.container.request.timeout.ms|5000|The allocator thread
periodically checks the state of the container requests and allocated
containers to determine the assignment of a container to an allocated resource.
If no resource is obtained after cluster-manager.container.request.timeout.ms
the request is declared to be expired.. When a request expires, it gets
allocated to any available container that was returned by the cluster manager
if none is available the existing resource [...]
+|cluster-manager.container.request.timeout.ms|5000|The allocator thread
periodically checks the state of the container requests and allocated
containers to determine the assignment of a container to an allocated resource.
This property determines the number of milliseconds before a container request
is considered to have expired / timed-out. When a request expires, it gets
allocated to any available container that was returned by the cluster manager.|
|task.execute|bin/run-container.sh|The command that starts a Samza container.
The script must be included in the [job package](./packaging.html). There is
usually no need to customize this.|
|task.java.home| |The JAVA_HOME path for Samza containers. By setting this
property, you can use a java version that is different from your cluster's java
version. Remember to set the `yarn.am.java.home` as well.|
|yarn.am.container.<br>memory.mb|1024|Each Samza job when running in Yarn has
one special container, the [ApplicationMaster](../yarn/application-master.html)
(AM), which manages the execution of the job. This property determines how much
memory, in megabytes, to request from YARN for running the ApplicationMaster.|
diff --git
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
index 361d1eb..89855dc 100644
---
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
+++
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
@@ -56,10 +56,10 @@ import org.slf4j.LoggerFactory;
* When host-affinity is disabled, the resource-request's preferredHost
param is set to {@link ResourceRequestState#ANY_HOST}
* </li>
* <li>
- * When the preferred resource has not been obtained after {@code
requestExpiryTimeout} milliseconds of the request
- * being made, the resource is declared expired. Expired request are
handled by allocating them to *ANY*
- * allocated resource if available. If no surplus resources are available
the current preferred resource-request
- * is cancelled and resource-request for ANY_HOST is issued
+ * When host-affinity is enabled and a preferred resource has not been
obtained after {@code requestExpiryTimeout}
+ * milliseconds of the request being made, the resource is declared
expired. The expired request are handled by
+ * allocating them to *ANY* allocated resource if available. If no surplus
resources are available the current preferred
+ * resource-request is cancelled and resource-request for ANY_HOST is issued
* </li>
* <li>
* When host-affinity is not enabled, this periodically wakes up to assign
a processor to *ANY* allocated resource.
@@ -219,7 +219,9 @@ public class ContainerAllocator implements Runnable {
if (expired) {
updateExpiryMetrics(request);
- handleExpiredRequest(processorId, preferredHost, request);
+ if (hostAffinityEnabled) {
+ handleExpiredRequestWithHostAffinityEnabled(processorId,
preferredHost, request);
+ }
} else {
LOG.info("Request for Processor ID: {} on preferred host {} has not
expired yet."
+ "Request creation time: {}. Current Time: {}. Request
timeout: {} ms", processorId, preferredHost,
@@ -233,10 +235,10 @@ public class ContainerAllocator implements Runnable {
/**
* Handles an expired resource request for both active and standby
containers. Since a preferred host cannot be obtained
* this method checks the availability of surplus ANY_HOST resources and
launches the container if available. Otherwise
- * issues an ANY_HOST request. This behavior holds regardless of
host-affinity enabled or not.
+ * issues an ANY_HOST request.
*/
@VisibleForTesting
- void handleExpiredRequest(String processorId, String preferredHost,
+ void handleExpiredRequestWithHostAffinityEnabled(String processorId, String
preferredHost,
SamzaResourceRequest request) {
boolean resourceAvailableOnAnyHost =
hasAllocatedResource(ResourceRequestState.ANY_HOST);
if (standbyContainerManager.isPresent()) {
diff --git
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
index 823191b..927df89 100644
---
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
+++
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
@@ -425,9 +425,9 @@ public class TestContainerAllocatorWithHostAffinity {
// Verify that all the request that were created as preferred host
requests expired
assertTrue(state.preferredHostRequests.get() == 2);
assertTrue(state.expiredPreferredHostRequests.get() == 2);
- verify(spyAllocator, times(1)).handleExpiredRequest(eq("0"),
eq("hostname-0"),
+ verify(spyAllocator,
times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("0"), eq("hostname-0"),
any(SamzaResourceRequest.class));
- verify(spyAllocator, times(1)).handleExpiredRequest(eq("1"),
eq("hostname-1"),
+ verify(spyAllocator,
times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("1"), eq("hostname-1"),
any(SamzaResourceRequest.class));
// Verify that preferred host request were cancelled and since no surplus
resources were available
@@ -469,10 +469,10 @@ public class TestContainerAllocatorWithHostAffinity {
Thread.sleep(100);
// Verify that all the request that were created as preferred host
requests expired
- assertEquals(state.expiredPreferredHostRequests.get(), 2);
- verify(spyAllocator, times(1)).handleExpiredRequest(eq("0"),
eq("hostname-0"),
+ assertTrue(state.expiredPreferredHostRequests.get() == 2);
+ verify(spyAllocator,
times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("0"), eq("hostname-0"),
any(SamzaResourceRequest.class));
- verify(spyAllocator, times(1)).handleExpiredRequest(eq("1"),
eq("hostname-1"),
+ verify(spyAllocator,
times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("1"), eq("hostname-1"),
any(SamzaResourceRequest.class));
// Verify that runStreamProcessor was invoked with already available
ANY_HOST requests
diff --git
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
index f30f800..16eac0b 100644
---
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
+++
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithoutHostAffinity.java
@@ -24,7 +24,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.stream.Collectors;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.coordinator.JobModelManager;
@@ -88,7 +87,6 @@ public class TestContainerAllocatorWithoutHostAffinity {
put("cluster-manager.container.count", "1");
put("cluster-manager.container.retry.count", "1");
put("cluster-manager.container.retry.window.ms", "1999999999");
- put("cluster-manager.container.request.timeout.ms", "3");
put("cluster-manager.allocator.sleep.ms", "10");
put("cluster-manager.container.memory.mb", "512");
put("yarn.package.path", "/foo");
@@ -284,61 +282,12 @@ public class TestContainerAllocatorWithoutHostAffinity {
resourceRequestCaptor.getAllValues()
.forEach(resourceRequest ->
assertEquals(resourceRequest.getPreferredHost(),
ResourceRequestState.ANY_HOST));
assertTrue(state.anyHostRequests.get() == containersToHostMapping.size());
- // Expiry currently should not be invoked
- verify(spyAllocator, never()).handleExpiredRequest(anyString(),
anyString(),
+ // Expiry currently is only handled for host affinity enabled cases
+ verify(spyAllocator,
never()).handleExpiredRequestWithHostAffinityEnabled(anyString(), anyString(),
any(SamzaResourceRequest.class));
// Only updated when host affinity is enabled
assertTrue(state.matchedResourceRequests.get() == 0);
assertTrue(state.preferredHostRequests.get() == 0);
spyAllocator.stop();
}
-
- @Test
- public void testExpiredRequestAllocationOnAnyHost() throws Exception {
- MockClusterResourceManager spyManager = spy(new
MockClusterResourceManager(callback, state));
- spyAllocator = Mockito.spy(
- new ContainerAllocator(spyManager, config, state, false,
Optional.empty()));
-
- // Request Resources
- spyAllocator.requestResources(new HashMap<String, String>() {
- {
- put("0", "host-0");
- put("1", "host-1");
- }
- });
-
- spyThread = new Thread(spyAllocator);
- // Start the container allocator thread periodic assignment
- spyThread.start();
-
- // Let the request expire, expiration timeout is 3 ms
- Thread.sleep(100);
-
- // Verify that all the request that were created as ANY_HOST host
- // and all created requests expired
- assertEquals(state.preferredHostRequests.get(), 0);
- // Atleast 2 requests should expire & 2 ANY_HOST requests should be
generated
- assertTrue(state.anyHostRequests.get() >= 4);
- assertTrue(state.expiredAnyHostRequests.get() >= 2);
-
- verify(spyAllocator, atLeastOnce()).handleExpiredRequest(eq("0"),
eq(ResourceRequestState.ANY_HOST),
- any(SamzaResourceRequest.class));
- verify(spyAllocator, atLeastOnce()).handleExpiredRequest(eq("1"),
eq(ResourceRequestState.ANY_HOST),
- any(SamzaResourceRequest.class));
-
- // Verify that preferred host request were cancelled and since no surplus
resources were available
- // requestResource was invoked with ANY_HOST requests
- ArgumentCaptor<SamzaResourceRequest> cancelledRequestCaptor =
ArgumentCaptor.forClass(SamzaResourceRequest.class);
- // At least 2 preferred host requests were cancelled
- verify(spyManager,
atLeast(2)).cancelResourceRequest(cancelledRequestCaptor.capture());
- // Verify all the request cancelled were ANY_HOST
- assertTrue(cancelledRequestCaptor.getAllValues()
- .stream()
- .map(resourceRequest -> resourceRequest.getPreferredHost())
- .collect(Collectors.toSet())
- .size() == 1);
- containerAllocator.stop();
-
- }
-
}
diff --git
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
index 1ee68aa..f100393 100644
---
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
+++
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerProcessManager.java
@@ -70,7 +70,7 @@ public class TestContainerProcessManager {
put("cluster-manager.container.retry.count", "1");
put("cluster-manager.container.retry.window.ms", "1999999999");
put("cluster-manager.allocator.sleep.ms", "1");
- put("cluster-manager.container.request.timeout.ms", "100");
+ put("cluster-manager.container.request.timeout.ms", "2");
put("cluster-manager.container.memory.mb", "512");
put("yarn.package.path", "/foo");
put("task.inputs", "test-system.test-stream");