Repository: samza Updated Branches: refs/heads/master f6a2d54da -> 5c33fe772
SAMZA-1747: Add metric to measure effectiveness of host-affinity We require visibility into how effectively host-affinity performs. The goal is to help easily answer the following questions. - How effectively is YARN matching my preferred-host requests - When does Samza fallback to abandoning locality and issuing any-host requests? design doc: https://docs.google.com/document/d/1oeNKDnG4JIGT2846us-jpnGW_RUjMjPIKDeEUlE_-jg/edit# Author: Jagadish <[email protected]> Reviewers: Prateek M<[email protected]> Closes #553 from vjagadish1989/hostaffinity-metrics Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/5c33fe77 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/5c33fe77 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/5c33fe77 Branch: refs/heads/master Commit: 5c33fe7724f0ae92208472428a9e442643310957 Parents: f6a2d54 Author: Jagadish <[email protected]> Authored: Thu Jun 14 14:04:53 2018 -0700 Committer: Jagadish <[email protected]> Committed: Thu Jun 14 14:04:53 2018 -0700 ---------------------------------------------------------------------- .../AbstractContainerAllocator.java | 5 +++++ .../HostAwareContainerAllocator.java | 10 +++++++++ .../clustermanager/SamzaApplicationState.java | 8 +++++++ .../ContainerProcessManagerMetrics.scala | 22 ++++++++++++-------- .../TestHostAwareContainerAllocator.java | 8 +++++++ 5 files changed, 44 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/5c33fe77/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java index 521a43b..9f1afed 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/AbstractContainerAllocator.java @@ -207,6 +207,11 @@ public abstract class AbstractContainerAllocator implements Runnable { preferredHost, containerID); resourceRequestState.addResourceRequest(request); state.containerRequests.incrementAndGet(); + if (ResourceRequestState.ANY_HOST.equals(preferredHost)) { + state.anyHostRequests.incrementAndGet(); + } else { + state.preferredHostRequests.incrementAndGet(); + } } /** http://git-wip-us.apache.org/repos/asf/samza/blob/5c33fe77/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java b/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java index fe462e7..d59a893 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/HostAwareContainerAllocator.java @@ -79,6 +79,7 @@ public class HostAwareContainerAllocator extends AbstractContainerAllocator { boolean resourceAvailableOnAnyHost = hasAllocatedResource(ResourceRequestState.ANY_HOST); if (expired) { + updateExpiryMetrics(request); if (resourceAvailableOnAnyHost) { log.info("Request for container: {} on {} has expired. Running on ANY_HOST", request.getContainerID(), request.getPreferredHost()); runStreamProcessor(request, ResourceRequestState.ANY_HOST); @@ -109,4 +110,13 @@ public class HostAwareContainerAllocator extends AbstractContainerAllocator { } return requestExpired; } + + private void updateExpiryMetrics(SamzaResourceRequest request) { + String preferredHost = request.getPreferredHost(); + if (ResourceRequestState.ANY_HOST.equals(preferredHost)) { + state.expiredAnyHostRequests.incrementAndGet(); + } else { + state.expiredPreferredHostRequests.incrementAndGet(); + } + } } http://git-wip-us.apache.org/repos/asf/samza/blob/5c33fe77/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java index 0dcaace..4e6fc33 100644 --- a/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java +++ b/samza-core/src/main/java/org/apache/samza/clustermanager/SamzaApplicationState.java @@ -125,6 +125,14 @@ public class SamzaApplicationState { public final AtomicInteger matchedResourceRequests = new AtomicInteger(0); + public final AtomicInteger preferredHostRequests = new AtomicInteger(0); + + public final AtomicInteger anyHostRequests = new AtomicInteger(0); + + public final AtomicInteger expiredPreferredHostRequests = new AtomicInteger(0); + + public final AtomicInteger expiredAnyHostRequests = new AtomicInteger(0); + /** * Number of invalid container notifications. * http://git-wip-us.apache.org/repos/asf/samza/blob/5c33fe77/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala index c396ed6..15cb18f 100644 --- a/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala +++ b/samza-core/src/main/scala/org/apache/samza/metrics/ContainerProcessManagerMetrics.scala @@ -53,16 +53,20 @@ class ContainerProcessManagerMetrics( val mReleasedContainers = newGauge("released-containers", () => state.releasedContainers.get()) val mContainers = newGauge("container-count", () => state.containerCount) val mRedundantNotifications = newGauge("redundant-notifications", () => state.redundantNotifications.get()) - val mJobHealthy = newGauge("job-healthy", () => if (state.jobHealthy.get()) 1 else 0) - val mLocalityMatchedRequests = newGauge( - "locality-matched", - () => { - if (state.containerRequests.get() != 0) { - state.matchedResourceRequests.get() / state.containerRequests.get() - } else { - 0L - } + val mPreferredHostRequests = newGauge("preferred-host-requests", () => state.preferredHostRequests.get()) + val mAnyHostRequests = newGauge("any-host-requests", () => state.anyHostRequests.get()) + val mExpiredPreferredHostRequests = newGauge("expired-preferred-host-requests", () => state.expiredPreferredHostRequests.get()) + val mExpiredAnyHostRequests = newGauge("expired-any-host-requests", () => state.expiredAnyHostRequests.get()) + + val mHostAffinityMatchPct = newGauge("host-affinity-match-pct", () => { + val numPreferredHostRequests = state.preferredHostRequests.get() + val numExpiredPreferredHostRequests = state.expiredPreferredHostRequests.get() + if (numPreferredHostRequests != 0) { + 100.00 * (numPreferredHostRequests - numExpiredPreferredHostRequests) / numPreferredHostRequests + } else { + 0L + } }) jvm.start http://git-wip-us.apache.org/repos/asf/samza/blob/5c33fe77/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java index 490de51..b9fc2e5 100644 --- a/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java +++ b/samza-core/src/test/java/org/apache/samza/clustermanager/TestHostAwareContainerAllocator.java @@ -394,9 +394,17 @@ public class TestHostAwareContainerAllocator { Assert.fail("Timed out waiting for container-0 to launch"); } // verify that the second preferred host request should expire and should trigger ANY_HOST requests + // wait for 4 requests to be made (2 preferred-host requests - one each on host-1 & host-2; 2 any-host requests) if (!clusterResourceManager.awaitResourceRequests(4, 20, TimeUnit.SECONDS)) { Assert.fail("Timed out waiting for resource requests"); } + // verify 2 preferred host requests should have been made for host-1 and host-2 + Assert.assertEquals(2, state.preferredHostRequests.get()); + // verify both of them should have expired. + Assert.assertEquals(2, state.expiredPreferredHostRequests.get()); + // verify there were at-least 2 any-host requests + Assert.assertTrue(state.anyHostRequests.get() >= 2); + Assert.assertTrue(state.expiredAnyHostRequests.get() <= state.anyHostRequests.get()); // finally, provide a container from YARN after multiple requests containerAllocator.addResource(resource1); // verify all the test assertions
