prateekm commented on a change in pull request #1429:
URL: https://github.com/apache/samza/pull/1429#discussion_r614976079



##########
File path: 
samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
##########
@@ -280,6 +282,7 @@ class TaskInstance(
       if (timeSinceLastCommit < commitMaxDelayMs) {
         info("Skipping commit for taskName: %s since another commit is in 
progress. " +
           "%s ms have elapsed since the pending commit started." format 
(taskName, timeSinceLastCommit))
+        metrics.asyncCommitSkipped.set(numSkippedCommits + 1)

Review comment:
       This is always 1. You probably need to do 
`metrics.asyncCommitSkipped.set(metrics.asyncCommitSkipped.get + 1)`.

##########
File path: 
samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
##########
@@ -335,8 +341,21 @@ class TaskInstance(
         debug("Starting async stage of commit for taskName: %s checkpointId: 
%s" format (taskName, checkpointId))
 
         try {
+          val uploadStartTimeNs = System.nanoTime()
           val uploadSCMsFuture = commitManager.upload(checkpointId, 
snapshotSCMs)
 
+          uploadSCMsFuture.whenComplete(new BiConsumer[util.Map[String, 
util.Map[String, String]], Throwable] {
+            override def accept(t: util.Map[String, util.Map[String, String]], 
throwable: Throwable): Unit = {
+              if (throwable == null) {
+                metrics.asyncUploadNs.update(System.nanoTime() - 
uploadStartTimeNs)
+                metrics.asyncUploadsCompleted.inc()
+              } else {
+                debug("Commit upload did not complete successfully for 
taskName: %s checkpointId: %s with error msg: %s"
+                  format (taskName, checkpointId, throwable.getMessage))
+              }
+            }
+          })

Review comment:
       Add a metric for cleanupNs as well.

##########
File path: 
samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
##########
@@ -37,7 +37,14 @@ class TaskInstanceMetrics(
   val flushes = newCounter("flush-calls")
   val pendingMessages = newGauge("pending-messages", 0)
   val messagesInFlight = newGauge("messages-in-flight", 0)
-  val asyncCallbackCompleted = newCounter("async-callback-complete-calls");
+  val asyncCallbackCompleted = newCounter("async-callback-complete-calls")
+
+  val asyncCommitSkipped = newGauge("async-commits-skipped", 0)
+  val asyncUploadsCompleted = newCounter("async-uploads-completed")
+  val asyncUploadNs = newTimer("async-upload-ns")
+  val commitNs = newTimer("commit-ns")

Review comment:
       Does it make sense to have a container wide version of these metrics for 
an overview as well (in addition to per task version)?

##########
File path: 
samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
##########
@@ -280,6 +282,7 @@ class TaskInstance(
       if (timeSinceLastCommit < commitMaxDelayMs) {
         info("Skipping commit for taskName: %s since another commit is in 
progress. " +
           "%s ms have elapsed since the pending commit started." format 
(taskName, timeSinceLastCommit))
+        metrics.asyncCommitSkipped.set(numSkippedCommits + 1)

Review comment:
       Also the metric name should just be commitsSkipped / skippedCommits.

##########
File path: 
samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
##########
@@ -453,6 +472,11 @@ class TaskInstance(
                 "during async stage of commit for taskName: %s checkpointId: 
%s. New exception logged above. " +
                 "Saved exception under Caused By.", commitException.get())
             }
+          } else {
+            metrics.commitNs.update(System.nanoTime() - commitStartNs)
+            // reset the numbers skipped commits for the current commit
+            numSkippedCommits = 0

Review comment:
       It may not be correct to reset this gauge since the default reporting 
interval is only 1 minute. E.g., if you set task.commit.ms = 10 secs and skip 5 
commits, the reported gauge value may still incorrectly read 0 at the end. 
Better to leave this as an incrementing only value.

##########
File path: 
samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
##########
@@ -37,7 +37,14 @@ class TaskInstanceMetrics(
   val flushes = newCounter("flush-calls")
   val pendingMessages = newGauge("pending-messages", 0)
   val messagesInFlight = newGauge("messages-in-flight", 0)
-  val asyncCallbackCompleted = newCounter("async-callback-complete-calls");
+  val asyncCallbackCompleted = newCounter("async-callback-complete-calls")
+
+  val asyncCommitSkipped = newGauge("async-commits-skipped", 0)
+  val asyncUploadsCompleted = newCounter("async-uploads-completed")
+  val asyncUploadNs = newTimer("async-upload-ns")
+  val commitNs = newTimer("commit-ns")

Review comment:
       Minor: order these hierarchically: commitNs -> [snapshotNs, 
asyncCommitNs -> [uploadNs, checkpointWriteNs, cleanupNs]]

##########
File path: 
samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
##########
@@ -37,7 +37,14 @@ class TaskInstanceMetrics(
   val flushes = newCounter("flush-calls")
   val pendingMessages = newGauge("pending-messages", 0)
   val messagesInFlight = newGauge("messages-in-flight", 0)
-  val asyncCallbackCompleted = newCounter("async-callback-complete-calls");
+  val asyncCallbackCompleted = newCounter("async-callback-complete-calls")
+
+  val asyncCommitSkipped = newGauge("async-commits-skipped", 0)
+  val asyncUploadsCompleted = newCounter("async-uploads-completed")

Review comment:
       Would it be better to just have commitsSkipped and commitsCompleted for 
the overall commit? How would you use this metric in isolation?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to