xinyuiscool commented on code in PR #1616:
URL: https://github.com/apache/samza/pull/1616#discussion_r906439300
##########
samza-core/src/main/java/org/apache/samza/container/RunLoop.java:
##########
@@ -514,18 +548,46 @@ private void run() {
case END_OF_STREAM:
endOfStream();
break;
+ case DRAIN:
+ drain();
+ break;
default:
//no op
break;
}
}
+ /**
+ * Called when a task has drained i.e all SSPs for the task have received
a drain message.
+ * */
+ private void drain() {
+ state.complete = true;
+ state.startDrain();
+ try {
+ ReadableCoordinator coordinator = new
ReadableCoordinator(task.taskName());
+
+ task.drain(coordinator);
+
+ // issue a shutdown request for the task
+ coordinator.shutdown(TaskCoordinator.RequestScope.CURRENT_TASK);
+ coordinatorRequests.update(coordinator);
+
+ // issue a commit explicitly before we shutdown the task
+ // Adding commit to coordinator will not work as the state is marked
complete and NO_OP will always be the
+ // next operation for this task
+ commit();
Review Comment:
Do we make sure this commit will not be async?
##########
samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala:
##########
@@ -237,6 +233,13 @@ class SystemConsumers (
}
}
+ def drain(): Unit = {
+ if (!isDraining) {
+ isDraining = true;
+ info("SystemConsumers is set to drain mode.")
+ writeDrainControlMessageToSspQueue()
Review Comment:
Given Kafka consumer will spawn up a thread, seems it's possible in a small
chance to have more envelopes being added after the drain message. For example,
the consumer thread is trying to pull the consumer, and we get the drain
command at the same time. Is this situation being handled? It will be good to
write that in the design doc.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]