lakshmi-manasa-g commented on a change in pull request #1580:
URL: https://github.com/apache/samza/pull/1580#discussion_r810299062
##########
File path:
samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
##########
@@ -52,12 +55,22 @@ public GroupByPartition(Config config) {
continue;
}
- TaskName taskName = new TaskName(String.format("Partition %d",
ssp.getPartition().getPartitionId()));
-
- if (!groupedMap.containsKey(taskName)) {
- groupedMap.put(taskName, new HashSet<>());
+ // if elasticity factor > 1 then elasticity is enabled
+ // for each partition create ElasticityFactor number of tasks
+ // i.e; result will have number of tasks = ElasticityFactor X number of
partitions
+ // each task will have name Partition_X_Y where X is the partition
number and Y <= elasticityFactor
+ // each task Partition_X_Y consumes the keyBucket Y of all the SSP with
partition number X.
+ // #todo: add the elasticity ticket for more details?
+ if (elasticityFactor > 1) {
+ for (int i = 0; i < elasticityFactor; i++) {
+ TaskName taskName = new TaskName(String.format("Partition %d %d",
ssp.getPartition().getPartitionId(), i));
+ SystemStreamPartition sspWithKeyBucket = new
SystemStreamPartition(ssp, i);
+ addToTaskNameSSPMap(groupedMap, taskName, sspWithKeyBucket);
+ }
+ } else {
+ TaskName taskName = new TaskName(String.format("Partition %d",
ssp.getPartition().getPartitionId()));
+ addToTaskNameSSPMap(groupedMap, taskName, ssp);
Review comment:
this needs elasticityfactor to be >= 1 . updated the JobConfig to ensure
this.
##########
File path:
samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
##########
@@ -254,6 +257,29 @@ void registerInputStream(SystemStream input) {
return Collections.emptyList();
}
+ /**
+ * returns true if current task should broadcast control message (end of
stream/watermark) to others
+ * if elasticity is not enabled (elasticity factor <=1 ) then the current
task is eligible
+ * if elastiicty is enabled, pick the elastic task consuming keybucket = 0
of the ssp as the eligible task
+ * @param ssp ssp that the current task consumes
+ * @return true if current task is eligible to broadcast control messages
+ */
+ private boolean shouldTaskBroadcastToOtherPartitions(SystemStreamPartition
ssp) {
+ if (elasticityFactor <= 1) {
+ return true;
+ }
+
+ // if elasticity is enabled then taskModel actually has ssp with
keybuckets in it
+ // check if this current elastic task processes the first keybucket (=0)
of the ssp given
+ return
+ taskModel.getSystemStreamPartitions().stream()
+ .filter(sspInModel ->
+ ssp.getSystemStream().equals(sspInModel.getSystemStream()) //
ensure same systemstream as ssp given
+ && ssp.getPartition().equals(sspInModel.getPartition()) //
ensure same partition as ssp given
+ && sspInModel.getKeyBucket() == 0) // ensure sspInModel has
keyBucket 0
+ .count() > 0; // >0 means current task consumes the keyBucket = 0
of the ssp given
Review comment:
No. because a task could be consuming other ssp with keybucket=0. we
need to ensure it is consuming keybucket=0 for the particular ssp it consumes
AND for which it was aggregating eos/watermark.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]