Repository: samza Updated Branches: refs/heads/master a671288e1 -> 7ad6631bb
SAMZA-1429: Add callback success/failure metrics to async tasks jmakes & nickpan47 please take a look when you get the chance. Author: Daniel Nishimura <[email protected]> Reviewers: Jacob Maes <[email protected]> Closes #306 from dnishimura/samza-1429-async-metrics Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/7ad6631b Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/7ad6631b Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/7ad6631b Branch: refs/heads/master Commit: 7ad6631bbfca88d759e2cf5d4c0beeb5ef4eb013 Parents: a671288 Author: Daniel Nishimura <[email protected]> Authored: Thu Oct 5 09:18:37 2017 -0700 Committer: Jacob Maes <[email protected]> Committed: Thu Oct 5 09:18:37 2017 -0700 ---------------------------------------------------------------------- .../org/apache/samza/task/AsyncRunLoop.java | 1 + .../samza/container/TaskInstanceMetrics.scala | 1 + .../org/apache/samza/task/TestAsyncRunLoop.java | 40 ++++++++++++++++++++ 3 files changed, 42 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/7ad6631b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java index 58af820..b8f48c7 100644 --- a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java +++ b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java @@ -527,6 +527,7 @@ public class AsyncRunLoop implements Runnable, Throttleable { public void run() { try { state.doneProcess(); + state.taskMetrics.asyncCallbackCompleted().inc(); TaskCallbackImpl callbackImpl = (TaskCallbackImpl) callback; containerMetrics.processNs().update(clock.nanoTime() - callbackImpl.timeCreatedNs); log.trace("Got callback complete for task {}, ssp {}", http://git-wip-us.apache.org/repos/asf/samza/blob/7ad6631b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala index ffd9e7a..94cfbdc 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala @@ -36,6 +36,7 @@ class TaskInstanceMetrics( val flushes = newCounter("flush-calls") val pendingMessages = newGauge("pending-messages", 0) val messagesInFlight = newGauge("messages-in-flight", 0) + val asyncCallbackCompleted = newCounter("async-callback-complete-calls"); def addOffsetGauge(systemStreamPartition: SystemStreamPartition, getValue: () => String) { newGauge("%s-%s-%d-offset" format (systemStreamPartition.getSystem, systemStreamPartition.getStream, systemStreamPartition.getPartition.getPartitionId), getValue) http://git-wip-us.apache.org/repos/asf/samza/blob/7ad6631b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java index 5a4b4bf..b399f5f 100644 --- a/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java +++ b/samza-core/src/test/java/org/apache/samza/task/TestAsyncRunLoop.java @@ -46,6 +46,7 @@ import org.apache.samza.system.SystemConsumers; import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.system.TestSystemConsumers; import org.junit.Before; +import org.junit.Test; import scala.Option; import scala.collection.JavaConverters; @@ -200,6 +201,45 @@ public class TestAsyncRunLoop { when(consumerMultiplexer.pollIntervalMs()).thenReturn(10); } + @Test + public void testMetrics() throws Exception { + CountDownLatch task0ProcessedMessages = new CountDownLatch(2); + CountDownLatch task1ProcessedMessages = new CountDownLatch(1); + + TestTask task0 = new TestTask(true, true, false, task0ProcessedMessages); + TestTask task1 = new TestTask(true, true, false, task1ProcessedMessages); + TaskInstance t0 = createTaskInstance(task0, taskName0, ssp0); + TaskInstance t1 = createTaskInstance(task1, taskName1, ssp1); + + Map<TaskName, TaskInstance> tasks = new HashMap<>(); + tasks.put(taskName0, t0); + tasks.put(taskName1, t1); + //task0.callbackHandler = buildOutofOrderCallback(task0); + + int maxMessagesInFlight = 1; + AsyncRunLoop runLoop = new AsyncRunLoop(tasks, executor, consumerMultiplexer, maxMessagesInFlight, windowMs, commitMs, + callbackTimeoutMs, maxThrottlingDelayMs, containerMetrics, () -> 0L, false); + + when(consumerMultiplexer.choose(false)) + .thenReturn(envelope0) + .thenReturn(envelope3) + .thenReturn(envelope1) + .thenReturn(null) + .thenReturn(ssp0EndOfStream) + .thenReturn(ssp1EndOfStream) + .thenReturn(null); + + runLoop.run(); + + task0ProcessedMessages.await(); + task1ProcessedMessages.await(); + + assertEquals(2L, t0.metrics().asyncCallbackCompleted().getCount()); + assertEquals(1L, t1.metrics().asyncCallbackCompleted().getCount()); + assertEquals(5L, containerMetrics.envelopes().getCount()); + assertEquals(3L, containerMetrics.processes().getCount()); + } + //@Test public void testProcessMultipleTasks() throws Exception { CountDownLatch task0ProcessedMessages = new CountDownLatch(1);
