rmatharu commented on a change in pull request #1585:
URL: https://github.com/apache/samza/pull/1585#discussion_r814165532
##########
File path:
samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
##########
@@ -335,7 +353,19 @@ class SystemConsumers (
while (sspAndEnvelopeIterator.hasNext) {
val sspAndEnvelope = sspAndEnvelopeIterator.next
val systemStreamPartition = sspAndEnvelope.getKey
- val envelopes = new ArrayDeque(sspAndEnvelope.getValue)
+ val filtered_envelopes = new
util.ArrayList[IncomingMessageEnvelope](sspAndEnvelope.getValue)
+ // filter out all the envelopes with SSP not registered with this
SystemConsumers
+ // with elasticity enabled, SSP of the envelope will be the SSP with
KeyBucket
+ // and hence will filter out envelopes if their key bucket is not
registered
+ // without elasticity, there are no key buckets
+ // and hence full SSP is registered and all envelopes of the SSP will
be retained
+ filtered_envelopes.removeIf {
+ new Predicate[IncomingMessageEnvelope] {
+ def test(envelope: IncomingMessageEnvelope): Boolean =
+
!sspKeyBucketsRegistered.contains(envelope.getSystemStreamPartition(elasticityFactor))
+ }
+ }
+ val envelopes = new
ArrayDeque[IncomingMessageEnvelope](filtered_envelopes)
Review comment:
Then why does RunLoop need to decrement the metric when we have
filtering here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]