This is an automated email from the ASF dual-hosted git repository.

rayman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 796d97b  Revert "SAMZA-2475: Add a allocated resource expiry timeout 
in samza yarn type of apps (#1296)" (#1302)
796d97b is described below

commit 796d97bc5f354580eafcc4208e5320530862ccc1
Author: rmatharu <[email protected]>
AuthorDate: Wed Mar 4 13:10:22 2020 -0800

    Revert "SAMZA-2475: Add a allocated resource expiry timeout in samza yarn 
type of apps (#1296)" (#1302)
    
    This reverts commit 3a6e48cc91d3e03e8fe17ab4283182fe4d3f98a5.
---
 .../clustermanager/ClusterResourceManager.java     |  9 -----
 .../samza/clustermanager/ContainerAllocator.java   |  8 ----
 .../samza/clustermanager/ContainerManager.java     | 23 -----------
 .../apache/samza/clustermanager/SamzaResource.java | 18 ---------
 .../clustermanager/MockClusterResourceManager.java | 12 ------
 .../TestContainerAllocatorWithHostAffinity.java    | 45 ----------------------
 .../samza/job/yarn/YarnClusterResourceManager.java | 10 -----
 7 files changed, 125 deletions(-)

diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
index 8ea3c30..276bb4c 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ClusterResourceManager.java
@@ -132,15 +132,6 @@ public abstract class ClusterResourceManager {
 
   public abstract void stop(SamzaApplicationState.SamzaAppStatus status);
 
-  /**
-   * Checks if the allocated resource is expired. If the {@link 
ClusterResourceManager} does not have a
-   * concept of expired allocated resource we assume allocated resources never 
expire
-   * @param resource allocated resource
-   * @return if the allocated resource is expired
-   */
-  public boolean isResourceExpired(SamzaResource resource) {
-    return false;
-  }
 
   /***
    * Defines a callback interface for interacting with notifications from a 
ClusterResourceManager
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
index 2661611..2e223fc 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerAllocator.java
@@ -261,14 +261,6 @@ public class ContainerAllocator implements Runnable {
       throw new SamzaException("Expected resource for Processor ID: " + 
request.getProcessorId() + " was unavailable on host: " + preferredHost);
     }
 
-    /**
-     * If the allocated resource has expired then release the expired resource 
and re-request the resources from {@link ClusterResourceManager}
-     */
-    if (clusterResourceManager.isResourceExpired(resource)) {
-      containerManager.handleExpiredResource(request, resource, preferredHost, 
resourceRequestState, this);
-      return;
-    }
-
     // Update state
     resourceRequestState.updateStateAfterAssignment(request, preferredHost, 
resource);
     String processorId = request.getProcessorId();
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
index 7ba97ee..3e3a060 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/ContainerManager.java
@@ -305,29 +305,6 @@ public class ContainerManager {
   }
 
   /**
-   * Handles expired allocated resource by requesting the same resource again 
and release the expired allocated resource
-   *
-   * @param request pending request for the preferred host
-   * @param resource resource allocated from {@link ClusterResourceManager} 
which has expired
-   * @param preferredHost host on which container is requested to be deployed
-   * @param resourceRequestState state of request in {@link ContainerAllocator}
-   * @param allocator allocator for requesting resources
-   */
-  void handleExpiredResource(SamzaResourceRequest request, SamzaResource 
resource, String preferredHost,
-      ResourceRequestState resourceRequestState, ContainerAllocator allocator) 
{
-    LOG.info("Allocated resource {} has expired for Processor ID: {} request: 
{}. Re-requesting resource again",
-        resource, request.getProcessorId(), request);
-    resourceRequestState.releaseUnstartableContainer(resource, preferredHost);
-    resourceRequestState.cancelResourceRequest(request);
-    SamzaResourceRequest newResourceRequest = 
allocator.getResourceRequest(request.getProcessorId(), 
request.getPreferredHost());
-    if 
(hasActiveContainerPlacementAction(newResourceRequest.getProcessorId())) {
-      ContainerPlacementMetadata metadata = 
getPlacementActionMetadata(request.getProcessorId()).get();
-      metadata.recordResourceRequest(newResourceRequest);
-    }
-    allocator.issueResourceRequest(newResourceRequest);
-  }
-
-  /**
    * Registers a container placement action to move the running container to 
destination host, if destination host is same as the
    * host on which container is running, container placement action is treated 
as a restart.
    *
diff --git 
a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java 
b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java
index 30c0902..4d0bf91 100644
--- 
a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java
+++ 
b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaResource.java
@@ -19,9 +19,6 @@
 
 package org.apache.samza.clustermanager;
 
-import com.google.common.annotations.VisibleForTesting;
-
-
 /**
  * Specification of a Samza Resource. A resource is identified by a unique 
resource ID.
  * A resource is currently comprised of CPUs and Memory resources on a host.
@@ -31,7 +28,6 @@ public class SamzaResource {
   private final int memoryMb;
   private final String host;
   private final String containerId;
-  private final long timestamp;
 
   //TODO: Investigate adding disk space. Mesos supports disk based 
reservations.
 
@@ -40,16 +36,6 @@ public class SamzaResource {
     this.memoryMb = memoryMb;
     this.host = host;
     this.containerId = containerId;
-    this.timestamp = System.currentTimeMillis();
-  }
-
-  @VisibleForTesting
-  SamzaResource(int numCores, int memoryMb, String host, String containerId, 
long timestamp) {
-    this.numCores = numCores;
-    this.memoryMb = memoryMb;
-    this.host = host;
-    this.containerId = containerId;
-    this.timestamp = timestamp;
   }
 
   @Override
@@ -96,8 +82,4 @@ public class SamzaResource {
   public String getContainerId() {
     return containerId;
   }
-
-  public long getTimestamp() {
-    return timestamp;
-  }
 }
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
index e4be156..d50ce59 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/MockClusterResourceManager.java
@@ -20,7 +20,6 @@
 package org.apache.samza.clustermanager;
 
 import com.google.common.collect.ImmutableList;
-import java.time.Duration;
 import org.apache.samza.job.CommandBuilder;
 import org.junit.Assert;
 
@@ -109,13 +108,6 @@ public class MockClusterResourceManager extends 
ClusterResourceManager {
     clusterManagerCallback.onResourcesCompleted(statList);
   }
 
-  @Override
-  public boolean isResourceExpired(SamzaResource resource) {
-    Duration yarnAllocatedResourceExpiry = 
Duration.ofMinutes(10).minus(Duration.ofSeconds(30));
-    return System.currentTimeMillis() - resource.getTimestamp() > 
yarnAllocatedResourceExpiry.toMillis();
-  }
-
-
   public void registerContainerListener(MockContainerListener listener) {
     mockContainerListeners.add(listener);
   }
@@ -124,10 +116,6 @@ public class MockClusterResourceManager extends 
ClusterResourceManager {
     mockContainerListeners.clear();
   }
 
-  public boolean containsReleasedResource(SamzaResource resource) {
-    return releasedResources.contains(resource);
-  }
-
   @Override
   public void stop(SamzaApplicationState.SamzaAppStatus status) {
 
diff --git 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
index 593ddb9..bf2eabf 100644
--- 
a/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
+++ 
b/samza-core/src/test/java/org/apache/samza/clustermanager/TestContainerAllocatorWithHostAffinity.java
@@ -511,51 +511,6 @@ public class TestContainerAllocatorWithHostAffinity {
     containerAllocator.stop();
   }
 
-  @Test(timeout = 5000)
-  public void testExpiredAllocatedResourcesAreReleased() throws Exception {
-    ClusterResourceManager.Callback mockCPM = 
mock(MockClusterResourceManagerCallback.class);
-    MockClusterResourceManager mockClusterResourceManager = new 
MockClusterResourceManager(mockCPM, state);
-    ContainerManager spyContainerManager =
-        spy(new ContainerManager(containerPlacementMetadataStore, state, 
mockClusterResourceManager, true, false));
-
-    SamzaResource expiredAllocatedResource = new SamzaResource(1, 1000, 
"host-0", "id0",
-        System.currentTimeMillis() - Duration.ofMinutes(10).toMillis());
-    spyAllocator = Mockito.spy(
-        new ContainerAllocator(mockClusterResourceManager, config, state, 
true, spyContainerManager));
-    spyAllocator.addResource(expiredAllocatedResource);
-    spyAllocator.addResource(new SamzaResource(1, 1000, "host-1", "1d1"));
-
-    // Request Preferred Resources
-    spyAllocator.requestResources(new HashMap<String, String>() {
-      {
-        put("0", "host-0");
-        put("1", "host-1");
-      }
-    });
-
-    spyAllocatorThread = new Thread(spyAllocator);
-    // Start the container allocator thread periodic assignment
-    spyAllocatorThread.start();
-
-    // Wait until allocated resource is expired
-    while (state.preferredHostRequests.get() != 3) {
-      Thread.sleep(100);
-    }
-
-    // Verify that handleExpiredResource was invoked once for expired 
allocated resource
-    ArgumentCaptor<SamzaResourceRequest> resourceRequestCaptor = 
ArgumentCaptor.forClass(SamzaResourceRequest.class);
-    ArgumentCaptor<SamzaResource> resourceArgumentCaptor = 
ArgumentCaptor.forClass(SamzaResource.class);
-    verify(spyContainerManager, 
times(1)).handleExpiredResource(resourceRequestCaptor.capture(),
-        resourceArgumentCaptor.capture(), eq("host-0"), any(), any());
-    resourceRequestCaptor.getAllValues()
-        .forEach(resourceRequest -> 
assertEquals(resourceRequest.getProcessorId(), "0"));
-    resourceArgumentCaptor.getAllValues()
-        .forEach(resource -> assertEquals(resource.getHost(), "host-0"));
-    // Verify resources were released
-    
assertTrue(mockClusterResourceManager.containsReleasedResource(expiredAllocatedResource));
-    containerAllocator.stop();
-  }
-
   //@Test
   public void testExpiryWithNonResponsiveClusterManager() throws Exception {
 
diff --git 
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
index 8d23e04..43c49cc 100644
--- 
a/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
+++ 
b/samza-yarn/src/main/java/org/apache/samza/job/yarn/YarnClusterResourceManager.java
@@ -19,7 +19,6 @@
 
 package org.apache.samza.job.yarn;
 
-import java.time.Duration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -576,15 +575,6 @@ public class YarnClusterResourceManager extends 
ClusterResourceManager implement
     }
   }
 
-  @Override
-  public boolean isResourceExpired(SamzaResource resource) {
-    // Time from which resource was allocated > Yarn Expiry Timeout - 30 sec 
(to account for clock skew)
-    Duration yarnAllocatedResourceExpiry =
-        
Duration.ofMinutes(YarnConfiguration.DEFAULT_RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS)
-            .minus(Duration.ofSeconds(30));
-    return System.currentTimeMillis() - resource.getTimestamp() > 
yarnAllocatedResourceExpiry.toMillis();
-  }
-
   /**
    * Runs a process as specified by the command builder on the container.
    * @param processorId id of the samza processor to run (passed as a command 
line parameter to the process)

Reply via email to