dxichen commented on code in PR #1681:
URL: https://github.com/apache/samza/pull/1681#discussion_r1305926076


##########
samza-core/src/main/java/org/apache/samza/container/RunLoop.java:
##########
@@ -629,9 +640,22 @@ public TaskCallback createCallback() {
           containerMetrics.processes().inc();
           // report 1 whenever the contaienr is running. Can be used to 
calculate the number of containers not running
           containerMetrics.containerRunning().set(1L);
-          return isDraining && (envelope.isDrain() || envelope.isWatermark())
-              ? callbackManager.createCallbackForDrain(task.taskName(), 
envelope, coordinator, drainCallbackTimeoutMs)
-              : callbackManager.createCallback(task.taskName(), envelope, 
coordinator);
+
+          /*
+           * Timeout used in the task callback. The value is determined based 
on the following logic
+           * 1. If run loop is in draining mode and the envelope is drain, use 
drainCallbackTimeoutMs
+           * 2. If the envelope is watermark, use watermarkCallbackTimeoutMs 
regardless of the modes

Review Comment:
   Do we ever expect to set the default for drain higher than this watermark 
timeout? If that is the case it will impact existing drain behavior causing it 
to default to task.callback.ms on watermarks. We could consider taking the 
`Max(watermark timeout, drain timeout)` when they are both enabled
   
   if watermark should be higher than drain when both conditions apply, lets 
state that assumption



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to