prateekm commented on a change in pull request #1429:
URL: https://github.com/apache/samza/pull/1429#discussion_r614976079
##########
File path:
samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
##########
@@ -280,6 +282,7 @@ class TaskInstance(
if (timeSinceLastCommit < commitMaxDelayMs) {
info("Skipping commit for taskName: %s since another commit is in
progress. " +
"%s ms have elapsed since the pending commit started." format
(taskName, timeSinceLastCommit))
+ metrics.asyncCommitSkipped.set(numSkippedCommits + 1)
Review comment:
This is always 1. You probably need to do
`metrics.asyncCommitSkipped.set(metrics.asyncCommitSkipped.get + 1)`.
##########
File path:
samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
##########
@@ -335,8 +341,21 @@ class TaskInstance(
debug("Starting async stage of commit for taskName: %s checkpointId:
%s" format (taskName, checkpointId))
try {
+ val uploadStartTimeNs = System.nanoTime()
val uploadSCMsFuture = commitManager.upload(checkpointId,
snapshotSCMs)
+ uploadSCMsFuture.whenComplete(new BiConsumer[util.Map[String,
util.Map[String, String]], Throwable] {
+ override def accept(t: util.Map[String, util.Map[String, String]],
throwable: Throwable): Unit = {
+ if (throwable == null) {
+ metrics.asyncUploadNs.update(System.nanoTime() -
uploadStartTimeNs)
+ metrics.asyncUploadsCompleted.inc()
+ } else {
+ debug("Commit upload did not complete successfully for
taskName: %s checkpointId: %s with error msg: %s"
+ format (taskName, checkpointId, throwable.getMessage))
+ }
+ }
+ })
Review comment:
Add a metric for cleanupNs as well.
##########
File path:
samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
##########
@@ -37,7 +37,14 @@ class TaskInstanceMetrics(
val flushes = newCounter("flush-calls")
val pendingMessages = newGauge("pending-messages", 0)
val messagesInFlight = newGauge("messages-in-flight", 0)
- val asyncCallbackCompleted = newCounter("async-callback-complete-calls");
+ val asyncCallbackCompleted = newCounter("async-callback-complete-calls")
+
+ val asyncCommitSkipped = newGauge("async-commits-skipped", 0)
+ val asyncUploadsCompleted = newCounter("async-uploads-completed")
+ val asyncUploadNs = newTimer("async-upload-ns")
+ val commitNs = newTimer("commit-ns")
Review comment:
Does it make sense to have a container wide version of these metrics for
an overview as well (in addition to per task version)?
##########
File path:
samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
##########
@@ -280,6 +282,7 @@ class TaskInstance(
if (timeSinceLastCommit < commitMaxDelayMs) {
info("Skipping commit for taskName: %s since another commit is in
progress. " +
"%s ms have elapsed since the pending commit started." format
(taskName, timeSinceLastCommit))
+ metrics.asyncCommitSkipped.set(numSkippedCommits + 1)
Review comment:
Also the metric name should just be commitsSkipped / skippedCommits.
##########
File path:
samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala
##########
@@ -453,6 +472,11 @@ class TaskInstance(
"during async stage of commit for taskName: %s checkpointId:
%s. New exception logged above. " +
"Saved exception under Caused By.", commitException.get())
}
+ } else {
+ metrics.commitNs.update(System.nanoTime() - commitStartNs)
+ // reset the numbers skipped commits for the current commit
+ numSkippedCommits = 0
Review comment:
It may not be correct to reset this gauge since the default reporting
interval is only 1 minute. E.g., if you set task.commit.ms = 10 secs and skip 5
commits, the reported gauge value may still incorrectly read 0 at the end.
Better to leave this as an incrementing only value.
##########
File path:
samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
##########
@@ -37,7 +37,14 @@ class TaskInstanceMetrics(
val flushes = newCounter("flush-calls")
val pendingMessages = newGauge("pending-messages", 0)
val messagesInFlight = newGauge("messages-in-flight", 0)
- val asyncCallbackCompleted = newCounter("async-callback-complete-calls");
+ val asyncCallbackCompleted = newCounter("async-callback-complete-calls")
+
+ val asyncCommitSkipped = newGauge("async-commits-skipped", 0)
+ val asyncUploadsCompleted = newCounter("async-uploads-completed")
+ val asyncUploadNs = newTimer("async-upload-ns")
+ val commitNs = newTimer("commit-ns")
Review comment:
Minor: order these hierarchically: commitNs -> [snapshotNs,
asyncCommitNs -> [uploadNs, checkpointWriteNs, cleanupNs]]
##########
File path:
samza-core/src/main/scala/org/apache/samza/container/TaskInstanceMetrics.scala
##########
@@ -37,7 +37,14 @@ class TaskInstanceMetrics(
val flushes = newCounter("flush-calls")
val pendingMessages = newGauge("pending-messages", 0)
val messagesInFlight = newGauge("messages-in-flight", 0)
- val asyncCallbackCompleted = newCounter("async-callback-complete-calls");
+ val asyncCallbackCompleted = newCounter("async-callback-complete-calls")
+
+ val asyncCommitSkipped = newGauge("async-commits-skipped", 0)
+ val asyncUploadsCompleted = newCounter("async-uploads-completed")
Review comment:
Would it be better to just have commitsSkipped and commitsCompleted for
the overall commit? How would you use this metric in isolation?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]