This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new b8309aa  SAMZA-2475: Fix allocated resource expiry bug (#1310)
b8309aa is described below

commit b8309aa1950ad393cdf54063c2727d78d4f9d0bc
Author: Sanil Jain <[email protected]>
AuthorDate: Wed Mar 11 10:32:49 2020 -0700

    SAMZA-2475: Fix allocated resource expiry bug (#1310)
---
 .../samza/job/yarn/YarnClusterResourceManager.java |  2 +-
 .../job/yarn/TestYarnClusterResourceManager.java   | 25 ++++++++++++++++++++++
 2 files changed, 26 insertions(+), 1 deletion(-)

diff --git 
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index 8d23e04..eb97b69 100644
--- 
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++ 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -580,7 +580,7 @@ public class YarnClusterResourceManager extends 
ClusterResourceManager implement
   public boolean isResourceExpired(SamzaResource resource) {
     // Time from which resource was allocated > Yarn Expiry Timeout - 30 sec 
(to account for clock skew)
     Duration yarnAllocatedResourceExpiry =
-        
Duration.ofMinutes(YarnConfiguration.DEFAULT_RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS)
+        
Duration.ofMillis(YarnConfiguration.DEFAULT_RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS)
             .minus(Duration.ofSeconds(30));
     return System.currentTimeMillis() - resource.getTimestamp() > 
yarnAllocatedResourceExpiry.toMillis();
   }
diff --git 
a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java
 
b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java
index 042c91c..8f19eab 100644
--- 
a/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java
+++ 
b/samza-yarn/src/test/java/org/apache/samza/job/yarn/TestYarnClusterResourceManager.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.job.yarn;
 
+import java.time.Duration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -31,7 +32,9 @@ import 
org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.samza.clustermanager.ClusterResourceManager;
+import org.apache.samza.clustermanager.SamzaResource;
 import org.apache.samza.config.Config;
+import org.junit.Assert;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -78,4 +81,26 @@ public class TestYarnClusterResourceManager {
     assertEquals(0, yarnAppState.pendingProcessors.size());
     verify(callback, times(1)).onStreamProcessorLaunchFailure(anyObject(), 
any(Exception.class));
   }
+
+  @Test
+  public void testAllocatedResourceExpiryForYarn() {
+    YarnConfiguration yarnConfiguration = mock(YarnConfiguration.class);
+    SamzaAppMasterMetrics metrics = mock(SamzaAppMasterMetrics.class);
+    Config config = mock(Config.class);
+    AMRMClientAsync asyncClient = mock(AMRMClientAsync.class);
+    YarnAppState yarnAppState = new YarnAppState(0, mock(ContainerId.class), 
"host", 8080, 8081);
+    SamzaYarnAppMasterLifecycle lifecycle = 
mock(SamzaYarnAppMasterLifecycle.class);
+    SamzaYarnAppMasterService service = mock(SamzaYarnAppMasterService.class);
+    NMClientAsync asyncNMClient = mock(NMClientAsync.class);
+    ClusterResourceManager.Callback callback = 
mock(ClusterResourceManager.Callback.class);
+
+    // start the cluster manager
+    YarnClusterResourceManager yarnClusterResourceManager = new 
YarnClusterResourceManager(asyncClient, asyncNMClient,
+        callback, yarnAppState, lifecycle, service, metrics, 
yarnConfiguration, config);
+
+    SamzaResource allocatedResource = mock(SamzaResource.class);
+    
when(allocatedResource.getTimestamp()).thenReturn(System.currentTimeMillis() - 
Duration.ofMinutes(10).toMillis());
+
+    
Assert.assertTrue(yarnClusterResourceManager.isResourceExpired(allocatedResource));
+  }
 }
\ No newline at end of file

Reply via email to