Repository: samza Updated Branches: refs/heads/master 5431350b7 -> c99e57133
SAMZA-1631: Improve logging on Task callback timeout Author: Prateek Maheshwari <[email protected]> Reviewers: Jacob Maes <[email protected]> Closes #461 from prateekm/task-callback-logging Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c99e5713 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c99e5713 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c99e5713 Branch: refs/heads/master Commit: c99e57133f5bedab2355f55aefcc7bee8a93cc75 Parents: 5431350 Author: Prateek Maheshwari <[email protected]> Authored: Wed Mar 28 17:52:58 2018 -0700 Committer: Prateek Maheshwari <[email protected]> Committed: Wed Mar 28 17:52:58 2018 -0700 ---------------------------------------------------------------------- .../org/apache/samza/task/AsyncRunLoop.java | 28 ++++++------- .../org/apache/samza/task/TaskCallbackImpl.java | 22 +++++----- .../apache/samza/task/TaskCallbackManager.java | 9 +++-- .../task/TaskCallbackTimeoutException.java | 42 -------------------- 4 files changed, 33 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/c99e5713/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java index f4b1d41..589fbb8 100644 --- a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java +++ b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java @@ -455,17 +455,17 @@ public class AsyncRunLoop implements Runnable, Throttleable { task.window(coordinator); containerMetrics.windowNs().update(clock.nanoTime() - startTime); - // A window() that executes for more than task.window.ms, will starve the next process() call - // when the application has job.thread.pool.size > 1. This is due to prioritizing window() ahead of process() - // to guarantee window() will fire close to its trigger interval time. - // We warn the users if the average window execution time is greater than equals to window trigger interval. - long lowerBoundForWindowTriggerTimeInMs = TimeUnit.NANOSECONDS - .toMillis((long) containerMetrics.windowNs().getSnapshot().getAverage()); - if (windowMs <= lowerBoundForWindowTriggerTimeInMs) { - log.warn( - "window() call might potentially starve process calls." - + " Consider setting task.window.ms > {} ms", - lowerBoundForWindowTriggerTimeInMs); + /** + * Window calls that execute for more than task.window.ms will starve process calls + * since window has higher priority than process in {@link AsyncTaskState#nextOp()}. + * Warn the users if this is the case. + */ + long averageWindowMs = TimeUnit.NANOSECONDS.toMillis( + (long) containerMetrics.windowNs().getSnapshot().getAverage()); + if (averageWindowMs >= windowMs) { + log.warn("Average window call duration {} is greater than the configured task.window.ms {}. " + + "This can starve process calls, so consider setting task.window.ms >> {} ms.", + new Object[]{averageWindowMs, windowMs, averageWindowMs}); } coordinatorRequests.update(coordinator); @@ -589,7 +589,7 @@ public class AsyncRunLoop implements Runnable, Throttleable { coordinatorRequests.update(callbackToUpdate.coordinator); } } catch (Throwable t) { - log.error(t.getMessage(), t); + log.error("Error marking process as complete.", t); abort(t); } finally { resume(); @@ -610,9 +610,9 @@ public class AsyncRunLoop implements Runnable, Throttleable { abort(t); // update pending count, but not offset TaskCallbackImpl callbackImpl = (TaskCallbackImpl) callback; - log.error("Got callback failure for task {}", callbackImpl.taskName); + log.error("Got callback failure for task {}", callbackImpl.taskName, t); } catch (Throwable e) { - log.error(e.getMessage(), e); + log.error("Error marking process as failed.", e); } finally { resume(); } http://git-wip-us.apache.org/repos/asf/samza/blob/c99e5713/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java index 19b9f1c..b09db62 100644 --- a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java +++ b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java @@ -21,6 +21,8 @@ package org.apache.samza.task; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.samza.SamzaException; import org.apache.samza.container.TaskName; import org.apache.samza.system.IncomingMessageEnvelope; import org.slf4j.Logger; @@ -63,14 +65,15 @@ class TaskCallbackImpl implements TaskCallback, Comparable<TaskCallbackImpl> { if (scheduledFuture != null) { scheduledFuture.cancel(true); } - log.trace("Callback complete for ssp {} offset {}.", envelope.getSystemStreamPartition(), envelope.getOffset()); + log.trace("Callback complete for task {}, ssp {}, offset {}.", + new Object[] {taskName, envelope.getSystemStreamPartition(), envelope.getOffset()}); if (isComplete.compareAndSet(false, true)) { listener.onComplete(this); } else { - Throwable throwable = new IllegalStateException("TaskCallback complete has been invoked after completion"); - log.error("Callback for process task {}, envelope {}.", new Object[] {taskName, envelope}, throwable); - listener.onFailure(this, throwable); + String msg = String.format("Callback complete was invoked after completion for task {}, ssp {}, offset {}.", + taskName, envelope.getSystemStreamPartition(), envelope.getOffset()); + listener.onFailure(this, new IllegalStateException(msg)); } } @@ -79,14 +82,15 @@ class TaskCallbackImpl implements TaskCallback, Comparable<TaskCallbackImpl> { if (scheduledFuture != null) { scheduledFuture.cancel(true); } - log.error("Callback fails for task {} envelope {}.", new Object[] {taskName, envelope}, t); if (isComplete.compareAndSet(false, true)) { - listener.onFailure(this, t); + String msg = String.format("Callback failed for task %s, ssp %s, offset %s.", + taskName, envelope.getSystemStreamPartition(), envelope.getOffset()); + listener.onFailure(this, new SamzaException(msg, t)); } else { - Throwable throwable = new IllegalStateException("TaskCallback failure has been invoked after completion", t); - log.error("Callback for process task {}, envelope {}.", new Object[] {taskName, envelope}, throwable); - listener.onFailure(this, throwable); + String msg = String.format("Task callback failure was invoked after completion for task %s, ssp %s, offset %s.", + taskName, envelope.getSystemStreamPartition(), envelope.getOffset()); + listener.onFailure(this, new IllegalStateException(msg, t)); } } http://git-wip-us.apache.org/repos/asf/samza/blob/c99e5713/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java index 370cb1a..c773368 100644 --- a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java +++ b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java @@ -27,6 +27,8 @@ import java.util.Queue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; + +import org.apache.samza.SamzaException; import org.apache.samza.container.TaskName; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.util.HighResolutionClock; @@ -92,13 +94,14 @@ class TaskCallbackManager { public TaskCallbackImpl createCallback(TaskName taskName, IncomingMessageEnvelope envelope, ReadableCoordinator coordinator) { - final TaskCallbackImpl callback = new TaskCallbackImpl(listener, taskName, envelope, coordinator, seqNum++, clock.nanoTime()); + final TaskCallbackImpl callback = + new TaskCallbackImpl(listener, taskName, envelope, coordinator, seqNum++, clock.nanoTime()); if (timer != null) { Runnable timerTask = new Runnable() { @Override public void run() { - String msg = "Task " + callback.taskName + " callback times out"; - callback.failure(new TaskCallbackTimeoutException(msg)); + String msg = "Callback for task {} " + callback.taskName + " timed out after " + timeout + " ms."; + callback.failure(new SamzaException(msg)); } }; ScheduledFuture scheduledFuture = timer.schedule(timerTask, timeout, TimeUnit.MILLISECONDS); http://git-wip-us.apache.org/repos/asf/samza/blob/c99e5713/samza-core/src/main/java/org/apache/samza/task/TaskCallbackTimeoutException.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackTimeoutException.java b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackTimeoutException.java deleted file mode 100644 index bf7f13c..0000000 --- a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackTimeoutException.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.samza.task; - -import org.apache.samza.SamzaException; - - -/** - * Specific {@link SamzaException}s thrown when a task callback times out - */ -public class TaskCallbackTimeoutException extends SamzaException { - private static final long serialVersionUID = -2342134146355610665L; - - public TaskCallbackTimeoutException(Throwable e) { - super(e); - } - - public TaskCallbackTimeoutException(String msg) { - super(msg); - } - - public TaskCallbackTimeoutException(String msg, Throwable e) { - super(msg, e); - } -} \ No newline at end of file
