ajothomas commented on code in PR #1616:
URL: https://github.com/apache/samza/pull/1616#discussion_r927158298


##########
samza-core/src/main/java/org/apache/samza/container/RunLoop.java:
##########
@@ -805,6 +877,52 @@ private boolean checkEndOfStream() {
       return processingSspSet.isEmpty();
     }
 
+    private boolean shouldDrain() {
+      if (endOfStream) {
+        return false;
+      }
+
+      if (!pendingEnvelopeQueue.isEmpty()) {
+        PendingEnvelope pendingEnvelope = pendingEnvelopeQueue.peek();
+        IncomingMessageEnvelope envelope = pendingEnvelope.envelope;
+
+        if (envelope.isDrain()) {
+          final DrainMessage message = (DrainMessage) envelope.getMessage();
+          if (!message.getDeploymentId().equals(deploymentId)) {
+            // Removing the drain message from the pending queue as it doesn't 
match with the current deploymentId
+            // Removing it will ensure that it is not picked up by process()
+            pendingEnvelopeQueue.remove();
+          } else {
+            if (elasticityFactor <= 1) {
+              SystemStreamPartition ssp = envelope.getSystemStreamPartition();
+              processingSspSetToDrain.remove(ssp);
+            } else {
+              // if envelope contains drain message,
+              // the ssp of envelope should be removed from task's processing 
set irresp of keyBucket
+              SystemStreamPartition sspOfEnvelope = 
envelope.getSystemStreamPartition();
+              Optional<SystemStreamPartition> ssp = 
processingSspSetToDrain.stream()
+                  .filter(sspInSet -> 
sspInSet.getSystemStream().equals(sspOfEnvelope.getSystemStream())
+                      && 
sspInSet.getPartition().equals(sspOfEnvelope.getPartition()))
+                  .findFirst();
+              ssp.ifPresent(processingSspSetToDrain::remove);
+            }
+
+            if (!hasIntermediateStreams) {

Review Comment:
   This condition is necessary for high level API otherwise RunLoop will simply 
stop the task when drain message is seen for the final SSP. We want the drain 
message to go be processed by the high-level API DAG. This holds true for 
source SSPs and intermediate SSPs alike.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to