mynameborat commented on code in PR #1637:
URL: https://github.com/apache/samza/pull/1637#discussion_r1013403151
##########
samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java:
##########
@@ -106,7 +110,10 @@ public void run() {
callback.failure(new SamzaException(msg));
}
};
- ScheduledFuture scheduledFuture = timer.schedule(timerTask, timeout,
TimeUnit.MILLISECONDS);
+
+ final ScheduledFuture scheduledFuture = isDraining &&
(envelope.isDrain() || envelope.isWatermark())
+ ? timer.schedule(timerTask, drainCallbackTimeout,
TimeUnit.MILLISECONDS)
+ : timer.schedule(timerTask, timeout, TimeUnit.MILLISECONDS);
Review Comment:
Can we add unit tests to the addition to ensure we use the drainTimeout vs
timeout appropriately?
##########
samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java:
##########
@@ -106,7 +110,10 @@ public void run() {
callback.failure(new SamzaException(msg));
}
};
- ScheduledFuture scheduledFuture = timer.schedule(timerTask, timeout,
TimeUnit.MILLISECONDS);
+
+ final ScheduledFuture scheduledFuture = isDraining &&
(envelope.isDrain() || envelope.isWatermark())
Review Comment:
why do we need both `isDraining` and `envelope.isDrain()`? What happens if
the runloop has not propagated its intent but the envelope is draining? Will it
be treated as regular message processing?
##########
samza-core/src/main/java/org/apache/samza/config/TaskConfig.java:
##########
@@ -90,6 +90,13 @@ public class TaskConfig extends MapConfig {
// timeout for triggering a callback
public static final String CALLBACK_TIMEOUT_MS = "task.callback.timeout.ms";
static final long DEFAULT_CALLBACK_TIMEOUT_MS = -1L;
+
+ // timeout for triggering a callback during drain
+ public static final String DRAIN_CALLBACK_TIMEOUT_MS =
"task.callback.drain.timeout.ms";
+
+ // default timeout for triggering a callback during drain
+ static final long DEFAULT_DRAIN_CALLBACK_TIMEOUT_MS = -1L;
Review Comment:
What does -1L default denote? Wait forever in case of drain message flow
stuck is it?
##########
samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java:
##########
@@ -106,7 +110,10 @@ public void run() {
callback.failure(new SamzaException(msg));
}
};
- ScheduledFuture scheduledFuture = timer.schedule(timerTask, timeout,
TimeUnit.MILLISECONDS);
+
+ final ScheduledFuture scheduledFuture = isDraining &&
(envelope.isDrain() || envelope.isWatermark())
Review Comment:
Might be easier to drive this in one place as opposed to split across?
e.g., Would adding a new method w/ `createCallback(..., timeout)` fit better
here? That way you can keep all the logic of drain and watermark and where we
are abstracted from this piece of code (ideally thats how it should be) and
then have the caller determine the timeout based on its need.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]