Hi Yi, Thanks for your feedback. Responses inline.
> It seems like we can deprecate the whole BalancingTaskNameGrouper altogether. - Yes, that’s part of the proposed interface changes. > That also means that you will somehow store the task-to-container mapping info in the locality znode as well. It would be nice to make it clear how the task-to-container-to-physical-resource mapping is stored and read in ZK. - I think task assignment (task to processor) mapping is stored in recent version of JobModel in zookeeper. I don’t see the value in duplicating it in Locality znode as well (which will burden us with maintaining consistency between same data stored at two places). I want to derive container to localityId mapping based upon task to Locality mapping from locality zookeeper node and container to task mapping available in latest JobModel. When new processors join, they will not have any previous task assignment. Any new tasks added by changing SSPGrouper will not have any previous host assigned (will be open to be distributed to any available processor in the group). Please share your thoughts. Will update the proposal, if there's a consensus on this. > Why are we missing a processor-to-locationId mapping in the zknode data model? - Planning to derive it based out of task-to-locationId mapping from locality zookeeper node and container to taskId from the latest job model. > Also needs to include compatibility test after deprecating/changing TaskNameGrouper API, make sure the default behavior of default groupers is the same. - Added it to the compatibility section. > In Compatibility section, remove the future versions (i.e. 0.15) since we are not sure when this change can be completed and released yet. It seems that we are not changing the persistent data format for locality info in coordinator stream. Make it explicit. - Updated the compatibility section. > If you are making LocalityManager class an interface, are you planning to make it pluggable as well? Actually, I was thinking that the model we wanted to go is that making the metadata store for locality info an interface and pluggable, while keep the LocalityManager as a common implementation. - Yes, LocalityManager is pluggable interface (there will be two implementations one for coordinator-stream and other for zookeeper). I think you’re proposing the same thing as in my change, but with a different interface name(MetaStore instead of LocalityManager). I don't think there'll be any value in LocalityManager class at all, once we have the meta-store interface. > what’s the definition of LocationId? An interface? An abstract class? - It’s a unique identifier which represents a virtual-container/physical-host (any physical execution environment) in which a processor runs. It’s a pluggable interface(more like ProcessorIdGenerator). In case of YARN, physical hostname will be used as locationId. In standalone, a combination of slice-name, slice-id, instance-id will be used as locationId. > For Semantics of host affinity with run.id, “all stream processors of a samza application stops” = one run is not true for YARN deployed Samza applications. - Updated the proposal based upon this feedback. > Lastly, from the grouper API definition, you will not be able to get the physical location info, if it is not passed in via currentGenerationProcessorIds or ContainerModel. How are you going to resolve that w/o creating a LocalityManager in Grouper implementation class? I would strongly recommend no to create an instance of LocalityManager in the Grouper implementation class. - LocationId is part of the ContainerModel class and will be used to propagate the previous run's locality information. > if the input topic partitions change, or the SSP grouper changes, the task-to-ssp mapping changes and the task locality may not make sense at all. Is this considered out-of-scope. - SystemStreamPartitionCountMonitor is out of scope for this proposal. I think we can determine input streams partition change based upon previous job model version and current number of partitions of i/p streams and purge task locality information. After SEP-5, existing previous task locality information can be reused. > not quite clear to me that what’s the distributed barrier is used for in the graph. For every container process to pick up a certain version of JobModel? Who is waiting on the barrier? The leader or the followers? Or everyone? - Leaders/followers are waiting on the barrier to agree upon a JobModel. Will add that to the state diagram. Thanks.