rayman7718 commented on a change in pull request #1580:
URL: https://github.com/apache/samza/pull/1580#discussion_r809601800
##########
File path:
samza-core/src/main/java/org/apache/samza/container/grouper/stream/GroupByPartition.java
##########
@@ -52,12 +55,22 @@ public GroupByPartition(Config config) {
continue;
}
- TaskName taskName = new TaskName(String.format("Partition %d",
ssp.getPartition().getPartitionId()));
-
- if (!groupedMap.containsKey(taskName)) {
- groupedMap.put(taskName, new HashSet<>());
+ // if elasticity factor > 1 then elasticity is enabled
+ // for each partition create ElasticityFactor number of tasks
+ // i.e; result will have number of tasks = ElasticityFactor X number of
partitions
+ // each task will have name Partition_X_Y where X is the partition
number and Y <= elasticityFactor
+ // each task Partition_X_Y consumes the keyBucket Y of all the SSP with
partition number X.
+ // #todo: add the elasticity ticket for more details?
+ if (elasticityFactor > 1) {
+ for (int i = 0; i < elasticityFactor; i++) {
+ TaskName taskName = new TaskName(String.format("Partition %d %d",
ssp.getPartition().getPartitionId(), i));
+ SystemStreamPartition sspWithKeyBucket = new
SystemStreamPartition(ssp, i);
+ addToTaskNameSSPMap(groupedMap, taskName, sspWithKeyBucket);
+ }
+ } else {
+ TaskName taskName = new TaskName(String.format("Partition %d",
ssp.getPartition().getPartitionId()));
+ addToTaskNameSSPMap(groupedMap, taskName, ssp);
Review comment:
Also, can be simplified into
`for (int i = 0; i < elasticityFactor; i++) {
String taskNameStr = elasticityFactor == 1 ? "Partition %d" : "Partition %d
%d";
TaskName taskName = new TaskName(taskNameStr);
int keyBucket = elasciticyFactor == 1 ? -1 : i;
SystemStreamPartition sspWithKeyBucket = new SystemStreamPartition(ssp, i);
groupedMap.putIfAbsent(taskName, new HashSet<>());
groupedMap.get(taskName).add(ssp);
}`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]