Repository: samza
Updated Branches:
  refs/heads/master 5431350b7 -> c99e57133


SAMZA-1631: Improve logging on Task callback timeout

Author: Prateek Maheshwari <[email protected]>

Reviewers: Jacob Maes <[email protected]>

Closes #461 from prateekm/task-callback-logging


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c99e5713
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c99e5713
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c99e5713

Branch: refs/heads/master
Commit: c99e57133f5bedab2355f55aefcc7bee8a93cc75
Parents: 5431350
Author: Prateek Maheshwari <[email protected]>
Authored: Wed Mar 28 17:52:58 2018 -0700
Committer: Prateek Maheshwari <[email protected]>
Committed: Wed Mar 28 17:52:58 2018 -0700

----------------------------------------------------------------------
 .../org/apache/samza/task/AsyncRunLoop.java     | 28 ++++++-------
 .../org/apache/samza/task/TaskCallbackImpl.java | 22 +++++-----
 .../apache/samza/task/TaskCallbackManager.java  |  9 +++--
 .../task/TaskCallbackTimeoutException.java      | 42 --------------------
 4 files changed, 33 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/c99e5713/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java 
b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
index f4b1d41..589fbb8 100644
--- a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
+++ b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
@@ -455,17 +455,17 @@ public class AsyncRunLoop implements Runnable, 
Throttleable {
             task.window(coordinator);
             containerMetrics.windowNs().update(clock.nanoTime() - startTime);
 
-            // A window() that executes for more than task.window.ms, will 
starve the next process() call
-            // when the application has job.thread.pool.size > 1. This is due 
to prioritizing window() ahead of process()
-            // to guarantee window() will fire close to its trigger interval 
time.
-            // We warn the users if the average window execution time is 
greater than equals to window trigger interval.
-            long lowerBoundForWindowTriggerTimeInMs = TimeUnit.NANOSECONDS
-                .toMillis((long) 
containerMetrics.windowNs().getSnapshot().getAverage());
-            if (windowMs <= lowerBoundForWindowTriggerTimeInMs) {
-              log.warn(
-                  "window() call might potentially starve process calls."
-                      + " Consider setting task.window.ms > {} ms",
-                  lowerBoundForWindowTriggerTimeInMs);
+            /**
+             * Window calls that execute for more than task.window.ms will 
starve process calls
+             * since window has higher priority than process in {@link 
AsyncTaskState#nextOp()}.
+             * Warn the users if this is the case.
+             */
+            long averageWindowMs = TimeUnit.NANOSECONDS.toMillis(
+                (long) containerMetrics.windowNs().getSnapshot().getAverage());
+            if (averageWindowMs >= windowMs) {
+              log.warn("Average window call duration {} is greater than the 
configured task.window.ms {}. " +
+                      "This can starve process calls, so consider setting 
task.window.ms >> {} ms.",
+                  new Object[]{averageWindowMs, windowMs, averageWindowMs});
             }
 
             coordinatorRequests.update(coordinator);
@@ -589,7 +589,7 @@ public class AsyncRunLoop implements Runnable, Throttleable 
{
               coordinatorRequests.update(callbackToUpdate.coordinator);
             }
           } catch (Throwable t) {
-            log.error(t.getMessage(), t);
+            log.error("Error marking process as complete.", t);
             abort(t);
           } finally {
             resume();
@@ -610,9 +610,9 @@ public class AsyncRunLoop implements Runnable, Throttleable 
{
         abort(t);
         // update pending count, but not offset
         TaskCallbackImpl callbackImpl = (TaskCallbackImpl) callback;
-        log.error("Got callback failure for task {}", callbackImpl.taskName);
+        log.error("Got callback failure for task {}", callbackImpl.taskName, 
t);
       } catch (Throwable e) {
-        log.error(e.getMessage(), e);
+        log.error("Error marking process as failed.", e);
       } finally {
         resume();
       }

http://git-wip-us.apache.org/repos/asf/samza/blob/c99e5713/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java 
b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java
index 19b9f1c..b09db62 100644
--- a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackImpl.java
@@ -21,6 +21,8 @@ package org.apache.samza.task;
 
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.samza.SamzaException;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.slf4j.Logger;
@@ -63,14 +65,15 @@ class TaskCallbackImpl implements TaskCallback, 
Comparable<TaskCallbackImpl> {
     if (scheduledFuture != null) {
       scheduledFuture.cancel(true);
     }
-    log.trace("Callback complete for ssp {} offset {}.", 
envelope.getSystemStreamPartition(), envelope.getOffset());
+    log.trace("Callback complete for task {}, ssp {}, offset {}.",
+        new Object[] {taskName, envelope.getSystemStreamPartition(), 
envelope.getOffset()});
 
     if (isComplete.compareAndSet(false, true)) {
       listener.onComplete(this);
     } else {
-      Throwable throwable = new IllegalStateException("TaskCallback complete 
has been invoked after completion");
-      log.error("Callback for process task {}, envelope {}.", new Object[] 
{taskName, envelope}, throwable);
-      listener.onFailure(this, throwable);
+      String msg = String.format("Callback complete was invoked after 
completion for task {}, ssp {}, offset {}.",
+          taskName, envelope.getSystemStreamPartition(), envelope.getOffset());
+      listener.onFailure(this, new IllegalStateException(msg));
     }
   }
 
@@ -79,14 +82,15 @@ class TaskCallbackImpl implements TaskCallback, 
Comparable<TaskCallbackImpl> {
     if (scheduledFuture != null) {
       scheduledFuture.cancel(true);
     }
-    log.error("Callback fails for task {} envelope {}.", new Object[] 
{taskName, envelope}, t);
 
     if (isComplete.compareAndSet(false, true)) {
-      listener.onFailure(this, t);
+      String msg = String.format("Callback failed for task %s, ssp %s, offset 
%s.",
+          taskName, envelope.getSystemStreamPartition(), envelope.getOffset());
+      listener.onFailure(this, new SamzaException(msg, t));
     } else {
-      Throwable throwable = new IllegalStateException("TaskCallback failure 
has been invoked after completion", t);
-      log.error("Callback for process task {}, envelope {}.", new Object[] 
{taskName, envelope}, throwable);
-      listener.onFailure(this, throwable);
+      String msg = String.format("Task callback failure was invoked after 
completion for task %s, ssp %s, offset %s.",
+          taskName, envelope.getSystemStreamPartition(), envelope.getOffset());
+      listener.onFailure(this, new IllegalStateException(msg, t));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/c99e5713/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java 
b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java
index 370cb1a..c773368 100644
--- a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java
+++ b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java
@@ -27,6 +27,8 @@ import java.util.Queue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+
+import org.apache.samza.SamzaException;
 import org.apache.samza.container.TaskName;
 import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.util.HighResolutionClock;
@@ -92,13 +94,14 @@ class TaskCallbackManager {
   public TaskCallbackImpl createCallback(TaskName taskName,
       IncomingMessageEnvelope envelope,
       ReadableCoordinator coordinator) {
-    final TaskCallbackImpl callback = new TaskCallbackImpl(listener, taskName, 
envelope, coordinator, seqNum++, clock.nanoTime());
+    final TaskCallbackImpl callback =
+        new TaskCallbackImpl(listener, taskName, envelope, coordinator, 
seqNum++, clock.nanoTime());
     if (timer != null) {
       Runnable timerTask = new Runnable() {
         @Override
         public void run() {
-          String msg = "Task " + callback.taskName + " callback times out";
-          callback.failure(new TaskCallbackTimeoutException(msg));
+          String msg = "Callback for task {} " + callback.taskName + " timed 
out after " + timeout + " ms.";
+          callback.failure(new SamzaException(msg));
         }
       };
       ScheduledFuture scheduledFuture = timer.schedule(timerTask, timeout, 
TimeUnit.MILLISECONDS);

http://git-wip-us.apache.org/repos/asf/samza/blob/c99e5713/samza-core/src/main/java/org/apache/samza/task/TaskCallbackTimeoutException.java
----------------------------------------------------------------------
diff --git 
a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackTimeoutException.java
 
b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackTimeoutException.java
deleted file mode 100644
index bf7f13c..0000000
--- 
a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackTimeoutException.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.task;
-
-import org.apache.samza.SamzaException;
-
-
-/**
- * Specific {@link SamzaException}s thrown when a task callback times out
- */
-public class TaskCallbackTimeoutException extends SamzaException {
-  private static final long serialVersionUID = -2342134146355610665L;
-
-  public TaskCallbackTimeoutException(Throwable e) {
-    super(e);
-  }
-
-  public TaskCallbackTimeoutException(String msg) {
-    super(msg);
-  }
-
-  public TaskCallbackTimeoutException(String msg, Throwable e) {
-    super(msg, e);
-  }
-}
\ No newline at end of file

Reply via email to