rmatharu commented on a change in pull request #1585:
URL: https://github.com/apache/samza/pull/1585#discussion_r814165532



##########
File path: 
samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
##########
@@ -335,7 +353,19 @@ class SystemConsumers (
       while (sspAndEnvelopeIterator.hasNext) {
         val sspAndEnvelope = sspAndEnvelopeIterator.next
         val systemStreamPartition = sspAndEnvelope.getKey
-        val envelopes = new ArrayDeque(sspAndEnvelope.getValue)
+        val filtered_envelopes = new 
util.ArrayList[IncomingMessageEnvelope](sspAndEnvelope.getValue)
+        // filter out all the envelopes with SSP not registered with this 
SystemConsumers
+        // with elasticity enabled, SSP of the envelope will be the SSP with 
KeyBucket
+        // and hence will filter out envelopes if their key bucket is not 
registered
+        // without elasticity, there are no key buckets
+        // and hence full SSP is registered and all envelopes of the SSP will 
be retained
+        filtered_envelopes.removeIf {
+          new Predicate[IncomingMessageEnvelope] {
+            def test(envelope: IncomingMessageEnvelope): Boolean =
+              
!sspKeyBucketsRegistered.contains(envelope.getSystemStreamPartition(elasticityFactor))
+          }
+        }
+        val envelopes = new 
ArrayDeque[IncomingMessageEnvelope](filtered_envelopes)

Review comment:
       Then why does RunLoop need to decrement the metric when we have 
filtering here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to