ajothomas commented on code in PR #1616:
URL: https://github.com/apache/samza/pull/1616#discussion_r923696627
##########
samza-core/src/main/java/org/apache/samza/container/RunLoop.java:
##########
@@ -514,18 +548,46 @@ private void run() {
case END_OF_STREAM:
endOfStream();
break;
+ case DRAIN:
+ drain();
+ break;
default:
//no op
break;
}
}
+ /**
+ * Called when a task has drained i.e all SSPs for the task have received
a drain message.
+ * */
+ private void drain() {
+ state.complete = true;
+ state.startDrain();
+ try {
+ ReadableCoordinator coordinator = new
ReadableCoordinator(task.taskName());
+
+ task.drain(coordinator);
+
+ // issue a shutdown request for the task
+ coordinator.shutdown(TaskCoordinator.RequestScope.CURRENT_TASK);
+ coordinatorRequests.update(coordinator);
+
+ // issue a commit explicitly before we shutdown the task
+ // Adding commit to coordinator will not work as the state is marked
complete and NO_OP will always be the
+ // next operation for this task
+ commit();
Review Comment:
Yes- We do the following:
- set the isAsyncCommitEnabled to false
- commits will happen on the RunLoop thread instead of the commit executor
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]