This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d5c9ef  SAMZA-2475: Add a allocated resource expiry timeout in samza 
yarn type of apps (#1303)
9d5c9ef is described below

commit 9d5c9ef87e0c27d35e136998f4ef9f1df561cb4c
Author: mynameborat <[email protected]>
AuthorDate: Wed Mar 4 14:54:12 2020 -0800

    SAMZA-2475: Add a allocated resource expiry timeout in samza yarn type of 
apps (#1303)
---
 .../clustermanager/ClusterResourceManager.java     |  9 +++++
 .../samza/clustermanager/ContainerAllocator.java   |  8 ++++
 .../samza/clustermanager/ContainerManager.java     | 23 +++++++++++
 .../apache/samza/clustermanager/SamzaResource.java | 18 +++++++++
 .../clustermanager/MockClusterResourceManager.java | 12 ++++++
 .../TestContainerAllocatorWithHostAffinity.java    | 45 ++++++++++++++++++++++
 .../samza/job/yarn/YarnClusterResourceManager.java | 10 +++++
 7 files changed, 125 insertions(+)

diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
index 276bb4c..8ea3c30 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
@@ -132,6 +132,15 @@ public abstract class ClusterResourceManager {
 
   public abstract void stop(SamzaApplicationState.SamzaAppStatus status);
 
+  /**
+   * Checks if the allocated resource is expired. If the {@link 
ClusterResourceManager} does not have a
+   * concept of expired allocated resource we assume allocated resources never 
expire
+   * @param resource allocated resource
+   * @return if the allocated resource is expired
+   */
+  public boolean isResourceExpired(SamzaResource resource) {
+    return false;
+  }
 
   /***
    * Defines a callback interface for interacting with notifications from a 
ClusterResourceManager
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
index 2e223fc..2661611 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
@@ -261,6 +261,14 @@ public class ContainerAllocator implements Runnable {
       throw new SamzaException("Expected resource for Processor ID: " + 
request.getProcessorId() + " was unavailable on host: " + preferredHost);
     }
 
+    /**
+     * If the allocated resource has expired then release the expired resource 
and re-request the resources from {@link ClusterResourceManager}
+     */
+    if (clusterResourceManager.isResourceExpired(resource)) {
+      containerManager.handleExpiredResource(request, resource, preferredHost, 
resourceRequestState, this);
+      return;
+    }
+
     // Update state
     resourceRequestState.updateStateAfterAssignment(request, preferredHost, 
resource);
     String processorId = request.getProcessorId();
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
index 3e3a060..7ba97ee 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
@@ -305,6 +305,29 @@ public class ContainerManager {
   }
 
   /**
+   * Handles expired allocated resource by requesting the same resource again 
and release the expired allocated resource
+   *
+   * @param request pending request for the preferred host
+   * @param resource resource allocated from {@link ClusterResourceManager} 
which has expired
+   * @param preferredHost host on which container is requested to be deployed
+   * @param resourceRequestState state of request in {@link ContainerAllocator}
+   * @param allocator allocator for requesting resources
+   */
+  void handleExpiredResource(SamzaResourceRequest request, SamzaResource 
resource, String preferredHost,
+      ResourceRequestState resourceRequestState, ContainerAllocator allocator) 
{
+    LOG.info("Allocated resource {} has expired for Processor ID: {} request: 
{}. Re-requesting resource again",
+        resource, request.getProcessorId(), request);
+    resourceRequestState.releaseUnstartableContainer(resource, preferredHost);
+    resourceRequestState.cancelResourceRequest(request);
+    SamzaResourceRequest newResourceRequest = 
allocator.getResourceRequest(request.getProcessorId(), 
request.getPreferredHost());
+    if 
(hasActiveContainerPlacementAction(newResourceRequest.getProcessorId())) {
+      ContainerPlacementMetadata metadata = 
getPlacementActionMetadata(request.getProcessorId()).get();
+      metadata.recordResourceRequest(newResourceRequest);
+    }
+    allocator.issueResourceRequest(newResourceRequest);
+  }
+
+  /**
    * Registers a container placement action to move the running container to 
destination host, if destination host is same as the
    * host on which container is running, container placement action is treated 
as a restart.
    *
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java 
b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java
index 4d0bf91..30c0902 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java
@@ -19,6 +19,9 @@
 
 package org.apache.samza.clustermanager;
 
+import com.google.common.annotations.VisibleForTesting;
+
+
 /**
  * Specification of a Samza Resource. A resource is identified by a unique 
resource ID.
  * A resource is currently comprised of CPUs and Memory resources on a host.
@@ -28,6 +31,7 @@ public class SamzaResource {
   private final int memoryMb;
   private final String host;
   private final String containerId;
+  private final long timestamp;
 
   //TODO: Investigate adding disk space. Mesos supports disk based 
reservations.
 
@@ -36,6 +40,16 @@ public class SamzaResource {
     this.memoryMb = memoryMb;
     this.host = host;
     this.containerId = containerId;
+    this.timestamp = System.currentTimeMillis();
+  }
+
+  @VisibleForTesting
+  SamzaResource(int numCores, int memoryMb, String host, String containerId, 
long timestamp) {
+    this.numCores = numCores;
+    this.memoryMb = memoryMb;
+    this.host = host;
+    this.containerId = containerId;
+    this.timestamp = timestamp;
   }
 
   @Override
@@ -82,4 +96,8 @@ public class SamzaResource {
   public String getContainerId() {
     return containerId;
   }
+
+  public long getTimestamp() {
+    return timestamp;
+  }
 }
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
index d50ce59..e4be156 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
@@ -20,6 +20,7 @@
 package org.apache.samza.clustermanager;
 
 import com.google.common.collect.ImmutableList;
+import java.time.Duration;
 import org.apache.samza.job.CommandBuilder;
 import org.junit.Assert;
 
@@ -108,6 +109,13 @@ public class MockClusterResourceManager extends 
ClusterResourceManager {
     clusterManagerCallback.onResourcesCompleted(statList);
   }
 
+  @Override
+  public boolean isResourceExpired(SamzaResource resource) {
+    Duration yarnAllocatedResourceExpiry = 
Duration.ofMinutes(10).minus(Duration.ofSeconds(30));
+    return System.currentTimeMillis() - resource.getTimestamp() > 
yarnAllocatedResourceExpiry.toMillis();
+  }
+
+
   public void registerContainerListener(MockContainerListener listener) {
     mockContainerListeners.add(listener);
   }
@@ -116,6 +124,10 @@ public class MockClusterResourceManager extends 
ClusterResourceManager {
     mockContainerListeners.clear();
   }
 
+  public boolean containsReleasedResource(SamzaResource resource) {
+    return releasedResources.contains(resource);
+  }
+
   @Override
   public void stop(SamzaApplicationState.SamzaAppStatus status) {
 
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
index bf2eabf..593ddb9 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
@@ -511,6 +511,51 @@ public class TestContainerAllocatorWithHostAffinity {
     containerAllocator.stop();
   }
 
+  @Test(timeout = 5000)
+  public void testExpiredAllocatedResourcesAreReleased() throws Exception {
+    ClusterResourceManager.Callback mockCPM = 
mock(MockClusterResourceManagerCallback.class);
+    MockClusterResourceManager mockClusterResourceManager = new 
MockClusterResourceManager(mockCPM, state);
+    ContainerManager spyContainerManager =
+        spy(new ContainerManager(containerPlacementMetadataStore, state, 
mockClusterResourceManager, true, false));
+
+    SamzaResource expiredAllocatedResource = new SamzaResource(1, 1000, 
"host-0", "id0",
+        System.currentTimeMillis() - Duration.ofMinutes(10).toMillis());
+    spyAllocator = Mockito.spy(
+        new ContainerAllocator(mockClusterResourceManager, config, state, 
true, spyContainerManager));
+    spyAllocator.addResource(expiredAllocatedResource);
+    spyAllocator.addResource(new SamzaResource(1, 1000, "host-1", "1d1"));
+
+    // Request Preferred Resources
+    spyAllocator.requestResources(new HashMap<String, String>() {
+      {
+        put("0", "host-0");
+        put("1", "host-1");
+      }
+    });
+
+    spyAllocatorThread = new Thread(spyAllocator);
+    // Start the container allocator thread periodic assignment
+    spyAllocatorThread.start();
+
+    // Wait until allocated resource is expired
+    while (state.preferredHostRequests.get() != 3) {
+      Thread.sleep(100);
+    }
+
+    // Verify that handleExpiredResource was invoked once for expired 
allocated resource
+    ArgumentCaptor<SamzaResourceRequest> resourceRequestCaptor = 
ArgumentCaptor.forClass(SamzaResourceRequest.class);
+    ArgumentCaptor<SamzaResource> resourceArgumentCaptor = 
ArgumentCaptor.forClass(SamzaResource.class);
+    verify(spyContainerManager, 
times(1)).handleExpiredResource(resourceRequestCaptor.capture(),
+        resourceArgumentCaptor.capture(), eq("host-0"), any(), any());
+    resourceRequestCaptor.getAllValues()
+        .forEach(resourceRequest -> 
assertEquals(resourceRequest.getProcessorId(), "0"));
+    resourceArgumentCaptor.getAllValues()
+        .forEach(resource -> assertEquals(resource.getHost(), "host-0"));
+    // Verify resources were released
+    
assertTrue(mockClusterResourceManager.containsReleasedResource(expiredAllocatedResource));
+    containerAllocator.stop();
+  }
+
   //@Test
   public void testExpiryWithNonResponsiveClusterManager() throws Exception {
 
diff --git 
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index 43c49cc..8d23e04 100644
--- 
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++ 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.job.yarn;
 
+import java.time.Duration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -575,6 +576,15 @@ public class YarnClusterResourceManager extends 
ClusterResourceManager implement
     }
   }
 
+  @Override
+  public boolean isResourceExpired(SamzaResource resource) {
+    // Time from which resource was allocated > Yarn Expiry Timeout - 30 sec 
(to account for clock skew)
+    Duration yarnAllocatedResourceExpiry =
+        
Duration.ofMinutes(YarnConfiguration.DEFAULT_RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS)
+            .minus(Duration.ofSeconds(30));
+    return System.currentTimeMillis() - resource.getTimestamp() > 
yarnAllocatedResourceExpiry.toMillis();
+  }
+
   /**
    * Runs a process as specified by the command builder on the container.
    * @param processorId id of the samza processor to run (passed as a command 
line parameter to the process)

Reply via email to