ajothomas commented on code in PR #1639:
URL: https://github.com/apache/samza/pull/1639#discussion_r1013123456


##########
samza-core/src/main/java/org/apache/samza/container/RunLoop.java:
##########
@@ -875,49 +876,40 @@ private boolean shouldDrain() {
         return false;
       }
 
-      if (!pendingEnvelopeQueue.isEmpty()) {
-        PendingEnvelope pendingEnvelope = pendingEnvelopeQueue.peek();
-        IncomingMessageEnvelope envelope = pendingEnvelope.envelope;
+      if (pendingEnvelopeQueue.size() > 0) {
+        final PendingEnvelope pendingEnvelope = pendingEnvelopeQueue.peek();
+        final IncomingMessageEnvelope envelope = pendingEnvelope.envelope;
 
         if (envelope.isDrain()) {
           final DrainMessage message = (DrainMessage) envelope.getMessage();
           if (!message.getRunId().equals(runId)) {
-            // Removing the drain message from the pending queue as it doesn't 
match with the current runId
-            // Removing it will ensure that it is not picked up by process()
-            pendingEnvelopeQueue.remove();
+            // Removing the drain message from the pending queue as it doesn't 
match with the current deployment
+            final PendingEnvelope discardedDrainMessage = 
pendingEnvelopeQueue.remove();
+            
consumerMultiplexer.tryUpdate(discardedDrainMessage.envelope.getSystemStreamPartition());
           } else {
+            // Found drain message matching the current deployment
+
             // set the RunLoop to drain mode
             if (!isDraining) {
               drain();
             }
 
-            if (elasticityFactor <= 1) {
-              SystemStreamPartition ssp = envelope.getSystemStreamPartition();
-              processingSspSetToDrain.remove(ssp);
-            } else {
-              // SystemConsumers will write only one envelope (enclosing 
DrainMessage) per SSP in its buffer.
-              // This envelope doesn't have keybucket info it's SSP. With 
elasticity, the same SSP can be processed by
-              // multiple tasks. Therefore, if envelope contains drain 
message, the ssp of envelope should be removed
-              // from task's processing set irrespective of keyBucket.
-              SystemStreamPartition sspOfEnvelope = 
envelope.getSystemStreamPartition();
-              Optional<SystemStreamPartition> ssp = 
processingSspSetToDrain.stream()
-                  .filter(sspInSet -> 
sspInSet.getSystemStream().equals(sspOfEnvelope.getSystemStream())
-                      && 
sspInSet.getPartition().equals(sspOfEnvelope.getPartition()))
-                  .findFirst();
-              ssp.ifPresent(processingSspSetToDrain::remove);
-            }
 
             if (!hasIntermediateStreams) {
-              // Don't remove from the pending queue as we want the DAG to 
pick up Drain message and propagate it to
-              // intermediate streams
+              // The flow below only applies to samza low-level API
+
+              // For high-level API, we do not remove the message from pending 
queue.
+              // It will be picked by the process flow instead of drain flow, 
as we want the drain control message
+              // to be processed by the High-level API Operator DAG.
+
+              
processingSspSetToDrain.remove(envelope.getSystemStreamPartition());
               pendingEnvelopeQueue.remove();
+              return processingSspSetToDrain.isEmpty();

Review Comment:
   Changed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to