ajothomas commented on code in PR #1639:
URL: https://github.com/apache/samza/pull/1639#discussion_r1013123301


##########
samza-core/src/main/java/org/apache/samza/container/RunLoop.java:
##########
@@ -875,49 +876,40 @@ private boolean shouldDrain() {
         return false;
       }
 
-      if (!pendingEnvelopeQueue.isEmpty()) {
-        PendingEnvelope pendingEnvelope = pendingEnvelopeQueue.peek();
-        IncomingMessageEnvelope envelope = pendingEnvelope.envelope;
+      if (pendingEnvelopeQueue.size() > 0) {
+        final PendingEnvelope pendingEnvelope = pendingEnvelopeQueue.peek();
+        final IncomingMessageEnvelope envelope = pendingEnvelope.envelope;
 
         if (envelope.isDrain()) {
           final DrainMessage message = (DrainMessage) envelope.getMessage();
           if (!message.getRunId().equals(runId)) {
-            // Removing the drain message from the pending queue as it doesn't 
match with the current runId
-            // Removing it will ensure that it is not picked up by process()
-            pendingEnvelopeQueue.remove();
+            // Removing the drain message from the pending queue as it doesn't 
match with the current deployment
+            final PendingEnvelope discardedDrainMessage = 
pendingEnvelopeQueue.remove();
+            
consumerMultiplexer.tryUpdate(discardedDrainMessage.envelope.getSystemStreamPartition());
           } else {
+            // Found drain message matching the current deployment
+
             // set the RunLoop to drain mode
             if (!isDraining) {
               drain();
             }
 
-            if (elasticityFactor <= 1) {
-              SystemStreamPartition ssp = envelope.getSystemStreamPartition();
-              processingSspSetToDrain.remove(ssp);
-            } else {
-              // SystemConsumers will write only one envelope (enclosing 
DrainMessage) per SSP in its buffer.
-              // This envelope doesn't have keybucket info it's SSP. With 
elasticity, the same SSP can be processed by
-              // multiple tasks. Therefore, if envelope contains drain 
message, the ssp of envelope should be removed
-              // from task's processing set irrespective of keyBucket.
-              SystemStreamPartition sspOfEnvelope = 
envelope.getSystemStreamPartition();
-              Optional<SystemStreamPartition> ssp = 
processingSspSetToDrain.stream()
-                  .filter(sspInSet -> 
sspInSet.getSystemStream().equals(sspOfEnvelope.getSystemStream())
-                      && 
sspInSet.getPartition().equals(sspOfEnvelope.getPartition()))
-                  .findFirst();
-              ssp.ifPresent(processingSspSetToDrain::remove);
-            }
 
             if (!hasIntermediateStreams) {

Review Comment:
   Changed this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to