dxichen commented on code in PR #1681:
URL: https://github.com/apache/samza/pull/1681#discussion_r1305926076
##########
samza-core/src/main/java/org/apache/samza/container/RunLoop.java:
##########
@@ -629,9 +640,22 @@ public TaskCallback createCallback() {
containerMetrics.processes().inc();
// report 1 whenever the contaienr is running. Can be used to
calculate the number of containers not running
containerMetrics.containerRunning().set(1L);
- return isDraining && (envelope.isDrain() || envelope.isWatermark())
- ? callbackManager.createCallbackForDrain(task.taskName(),
envelope, coordinator, drainCallbackTimeoutMs)
- : callbackManager.createCallback(task.taskName(), envelope,
coordinator);
+
+ /*
+ * Timeout used in the task callback. The value is determined based
on the following logic
+ * 1. If run loop is in draining mode and the envelope is drain, use
drainCallbackTimeoutMs
+ * 2. If the envelope is watermark, use watermarkCallbackTimeoutMs
regardless of the modes
Review Comment:
Do we ever expect to set the default for drain higher than this watermark
timeout? If that is the case it will impact existing drain behavior causing it
to default to task.callback.ms on watermarks. We could consider taking the
`Max(watermark timeout, drain timeout)` when they are both enabled
if watermark should be higher than drain when both conditions apply, lets
state that assumption
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]