rmatharu commented on a change in pull request #1585:
URL: https://github.com/apache/samza/pull/1585#discussion_r814164814
##########
File path:
samza-core/src/main/scala/org/apache/samza/system/SystemConsumers.scala
##########
@@ -112,13 +114,22 @@ class SystemConsumers (
* Clock can be used to inject a custom clock when mocking this class in
* tests. The default implementation returns the current system clock time.
*/
- val clock: () => Long = () => System.nanoTime()) extends Logging with
TimerUtil {
+ val clock: () => Long = () => System.nanoTime(),
+
+ val elasticityFactor: Int = 1) extends Logging with TimerUtil {
/**
* Mapping from the {@see SystemStreamPartition} to the registered offsets.
*/
private val sspToRegisteredOffsets = new HashMap[SystemStreamPartition,
String]()
+ /**
+ * Set of all the SystemStreamPartitions registered with this SystemConsumers
+ * With elasticity enabled, the SystemStreamPartitions are actually key
buckets within a full SSP
+ * Without elasticity, there are no key buckets and hence is the full SSP
+ */
+ private val sspKeyBucketsRegistered = new HashSet[SystemStreamPartition] ()
+
Review comment:
Seems like in some places in code we use SystemStreamPartition (with
keybucket as -1) some places with keyBucket as not -1 and call it "full SSP"
would it be better to create a separate class called
SystemStreamPartitionKeyBucket extend SystemStreamPartition
so that we deal with SystemStreamPartitionKeyBucket wherever/whenever we
want keyBucket !=-1, and wherever keyBucket is -1 we use SSP.
will make code much understandable and debuggable for future.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]