lakshmi-manasa-g commented on a change in pull request #1585:
URL: https://github.com/apache/samza/pull/1585#discussion_r819043852



##########
File path: samza-core/src/main/java/org/apache/samza/container/RunLoop.java
##########
@@ -276,6 +276,9 @@ private void runTasks(IncomingMessageEnvelope envelope) {
           
consumerMultiplexer.tryUpdate(envelope.getSystemStreamPartition(elasticityFactor));
           log.trace("updating the system consumers for ssp keyBucket {} not 
processed by this runloop",
               envelope.getSystemStreamPartition(elasticityFactor));
+          // since this envelope is not processed by the container, need to 
decrement the # envelopes metric
+          // # envelopes metric was incremented when the envelope was returned 
by the SystemConsumers
+          containerMetrics.envelopes().dec();

Review comment:
       added

##########
File path: 
samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
##########
@@ -112,13 +114,22 @@ class SystemConsumers (
    * Clock can be used to inject a custom clock when mocking this class in
    * tests. The default implementation returns the current system clock time.
    */
-  val clock: () => Long = () => System.nanoTime()) extends Logging with 
TimerUtil {
+  val clock: () => Long = () => System.nanoTime(),
+
+  val elasticityFactor: Int = 1) extends Logging with TimerUtil {
 
   /**
    * Mapping from the {@see SystemStreamPartition} to the registered offsets.
    */
   private val sspToRegisteredOffsets = new HashMap[SystemStreamPartition, 
String]()
 
+  /**
+   * Set of all the SystemStreamPartitions registered with this SystemConsumers
+   * With elasticity enabled, the SystemStreamPartitions are actually key 
buckets within a full SSP
+   * Without elasticity, there are no key buckets and hence is the full SSP
+   */
+  private val sspKeyBucketsRegistered = new HashSet[SystemStreamPartition] ()
+

Review comment:
       involves rework of previous elasticity PRs. Prefer doing this is as a 
separate PR and not in this one. SAMZA-2726 for this

##########
File path: 
samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
##########
@@ -335,7 +353,19 @@ class SystemConsumers (
       while (sspAndEnvelopeIterator.hasNext) {
         val sspAndEnvelope = sspAndEnvelopeIterator.next
         val systemStreamPartition = sspAndEnvelope.getKey
-        val envelopes = new ArrayDeque(sspAndEnvelope.getValue)
+        val filtered_envelopes = new 
util.ArrayList[IncomingMessageEnvelope](sspAndEnvelope.getValue)
+        // filter out all the envelopes with SSP not registered with this 
SystemConsumers
+        // with elasticity enabled, SSP of the envelope will be the SSP with 
KeyBucket
+        // and hence will filter out envelopes if their key bucket is not 
registered
+        // without elasticity, there are no key buckets
+        // and hence full SSP is registered and all envelopes of the SSP will 
be retained
+        filtered_envelopes.removeIf {
+          new Predicate[IncomingMessageEnvelope] {
+            def test(envelope: IncomingMessageEnvelope): Boolean =
+              
!sspKeyBucketsRegistered.contains(envelope.getSystemStreamPartition(elasticityFactor))
+          }
+        }
+        val envelopes = new 
ArrayDeque[IncomingMessageEnvelope](filtered_envelopes)

Review comment:
       removing this filtering logic as this is an optimzation but needs more 
investigation. created SAMZA-2724 for this piece.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to