Hi Yi,

Thanks for your feedback. Responses inline.


> It seems like we can deprecate the whole BalancingTaskNameGrouper
altogether.

           - Yes, that’s part of the proposed interface changes.

>  That also means that you will somehow store the task-to-container
mapping info in the locality znode as well. It would be nice to make it
clear how the task-to-container-to-physical-resource mapping is stored and
read in ZK.

            - I think task assignment (task to processor) mapping is stored
in  recent version of JobModel in zookeeper.  I don’t see the value in
duplicating it in Locality znode as well (which will burden us with
maintaining consistency between same data stored at two places). I want to
derive container to localityId mapping based upon task to Locality mapping
from locality zookeeper node and container to task mapping available in
latest JobModel. When new processors join, they will not have any previous
task assignment. Any new tasks  added by changing SSPGrouper will not have
any previous host assigned (will be open to be distributed to any available
processor in the group). Please share your thoughts. Will update the
proposal, if there's a consensus on this.


> Why are we missing a processor-to-locationId mapping in the zknode data
model?

            - Planning to derive it based out of task-to-locationId mapping
from locality zookeeper node and container to taskId from the latest job
model.


>  Also needs to include compatibility test after deprecating/changing
TaskNameGrouper API, make sure the default behavior of default groupers is
the same.

        - Added it to the compatibility section.


>  In Compatibility section, remove the future versions (i.e. 0.15) since
we are not sure when this change can be completed and released yet. It
seems that we are not changing the persistent data format for locality info
in coordinator stream. Make it explicit.

        - Updated the compatibility section.

>  If you are making LocalityManager class an interface, are you planning
to make it pluggable as well? Actually, I was thinking that the model we
wanted to go is that making the metadata store for locality info an
interface and pluggable, while keep the LocalityManager as a common
implementation.

        - Yes, LocalityManager is pluggable interface (there will be two
implementations one for coordinator-stream and other for zookeeper). I
think you’re proposing the  same thing as in my change, but with a
different interface name(MetaStore instead of LocalityManager). I don't
think there'll be any value in LocalityManager class at all, once we have
the meta-store interface.

>  what’s the definition of LocationId? An interface? An abstract class?
           - It’s a unique identifier which represents a
virtual-container/physical-host (any physical  execution environment) in
which a processor runs. It’s a pluggable interface(more like
ProcessorIdGenerator). In case of YARN, physical hostname will be used as
locationId. In standalone, a combination of slice-name, slice-id,
instance-id will be used as locationId.

> For Semantics of host affinity with run.id, “all stream processors of a samza
application stops” = one run is not true for YARN deployed Samza
applications.
          - Updated the proposal based upon this feedback.

> Lastly, from the grouper API definition, you will not be able to get the
physical location info, if it is not passed in via
currentGenerationProcessorIds or ContainerModel. How are you going to
resolve that w/o creating a LocalityManager in Grouper implementation
class? I would strongly recommend no to create an instance of
LocalityManager in the Grouper implementation class.
           - LocationId is part of the ContainerModel class and will be
used to propagate the previous run's locality information.

> if the input topic partitions change, or the SSP grouper changes, the
task-to-ssp mapping changes and the task locality may not make sense at
all. Is this considered out-of-scope.

           - SystemStreamPartitionCountMonitor is out of scope for
this  proposal.
I think we can determine input streams partition change based upon previous job
model version and current number of partitions of i/p streams and purge
task locality information. After SEP-5, existing previous task locality
information can be reused.

> not quite clear to me that what’s the distributed barrier is used for in
the graph. For every container process to pick up a certain version of
JobModel? Who is waiting on the barrier? The leader or the followers? Or
everyone?
            - Leaders/followers are waiting on the barrier to agree upon a
JobModel. Will add that to the state diagram.

Thanks.

Reply via email to