rayman7718 commented on a change in pull request #1580:
URL: https://github.com/apache/samza/pull/1580#discussion_r809604368
##########
File path:
samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
##########
@@ -254,6 +257,29 @@ void registerInputStream(SystemStream input) {
return Collections.emptyList();
}
+ /**
+ * returns true if current task should broadcast control message (end of
stream/watermark) to others
+ * if elasticity is not enabled (elasticity factor <=1 ) then the current
task is eligible
+ * if elastiicty is enabled, pick the elastic task consuming keybucket = 0
of the ssp as the eligible task
+ * @param ssp ssp that the current task consumes
+ * @return true if current task is eligible to broadcast control messages
+ */
+ private boolean shouldTaskBroadcastToOtherPartitions(SystemStreamPartition
ssp) {
+ if (elasticityFactor <= 1) {
+ return true;
+ }
+
+ // if elasticity is enabled then taskModel actually has ssp with
keybuckets in it
+ // check if this current elastic task processes the first keybucket (=0)
of the ssp given
+ return
+ taskModel.getSystemStreamPartitions().stream()
+ .filter(sspInModel ->
+ ssp.getSystemStream().equals(sspInModel.getSystemStream()) //
ensure same systemstream as ssp given
+ && ssp.getPartition().equals(sspInModel.getPartition()) //
ensure same partition as ssp given
+ && sspInModel.getKeyBucket() == 0) // ensure sspInModel has
keyBucket 0
+ .count() > 0; // >0 means current task consumes the keyBucket = 0
of the ssp given
Review comment:
Why not just
return ssp.getKeyBucket() == 0
?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]