[ https://issues.apache.org/jira/browse/SAMZA-516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14286772#comment-14286772 ]
Chris Riccomini commented on SAMZA-516: --------------------------------------- My current line of thinking is to write a StreamJob implementation that orchestrates jobs via ZooKeeper. The basic flow would look something like this: h3. Starting a single process. # Run {{run-job.sh}} on machine-1. # {{run-job.sh}} uses the supplied job config to instantiate StandaloneJobFactory, and gets a StandaloneJob. # StandaloneJob connects to ZK. # StandaloneJob adds itself as a process in the {{job group}} ZK directory using an ephemeral node. # StandaloneJob sets itself as a watcher in the {{job group}} container assignment directory. ## When StandaloneJob receives container assignments, it creates a new thread, and instantiates a new SamzaContainer on the thread. ## When StandaloneJob loses a container assignment, it interrupts the appropriate thread, which shuts down the SamzaContainer. # StandaloneJob checks if there is a JobCoordinator leader elected. # If there is no leader elected, StandaloneJob tries to elect itself as leader, and assigns containers to processes in the {{job group}}. ## Check for the number of connected processes in the {{job group}}. ## Use job.container.count to create a JobCoordinator with containerCount set accordingly. ## Assign containers equally among all processes in the {{job group}}. ## Set a watcher on the {{job group}} ZK directory. # SamzaContainer will then query the JobCoordinator via its HTTP API to begin starting the SamzaContainer. h3. Starting a second process. # All of the above steps are repeated. The second {{run-job.sh}} command could be run on machine-1, or some other machine. # A leader is already elected, so no leadership election happens for the JobCoordinator. # When this process registers itself in ZK (step (4), above), the watcher set in step (7.4) will be notified. # Watcher set in (7.4) will re-assign partitions from the first process to the second. # When reassignment happens, watcher set in (5) will trigger the start of new containers in this process. h3. Handling failover. # The first process fails. # Its ephemeral node times out of ZooKeeper. # The second process is notified via ZK that the process in the {{job group}} that was the owner of the JobCoordinator has failed. # The second process elects itself as JobCoordinator leader. # The new JobCoordinator diffs the list of processes in the {{job group}} with the container assignments. # Any containers that were assigned to the dead processes are shifted to live processes. This shift triggers the containers to start via the watcher defined in step (5.1). h3. Notes * The StandaloneJob could use processes instead of threads to run containers. This would allow task.opts to be set on SamzaContainers, and would also keep container code fully isolated from the JobCoordinator, and StandaloneJob ZK logic. * The process:container:stream task:partition nomenclature is pretty cumbersome. We're also adding a new concept here: something above the container. Currently, in YARN, the YARN container to SamzaContainer mapping is 1:1. The approach described above breaks this model, and makes it 1:*. The benefit of this is that it means we can shift SamzaContainers amongst Java processes without stopping existing containers. It also means we don't have to change any SamzaContainer code--everything in there can remain immutable. * When we move the AM UI out of YARN and into JobCoordinator, you'll be able to use the UI in standalone mode, but the UI will jump from node to node as machines fail. This seems kind of annoying. * Does it make sense to have the JobCoordinator run as an independent process? When it's down, existing containers continue to run, but no new containers can be added (I think). * This design doesn't support multi-job queries. For example, if you wanted to run "SELECT member\_id, COUNT(*) FROM stream GROUP BY member\_id", you basically can't, since this query requires running two jobs (one to repartition, and one to aggregate). * I haven't worked through the details on whether we could end up with orphaned processes that continue producing even after the coordinator thinks they're failed. This concern was voiced on the mailing list. If we can't safely rely on ZK to notify us when a container is ACTUALLY dead, then we could end up in split-brain scenarios, where we have multiple containers processing the same data. > Support standalone Samza jobs > ----------------------------- > > Key: SAMZA-516 > URL: https://issues.apache.org/jira/browse/SAMZA-516 > Project: Samza > Issue Type: Bug > Components: container > Affects Versions: 0.9.0 > Reporter: Chris Riccomini > Assignee: Chris Riccomini > > Samza currently supports two modes of operation out of the box: local and > YARN. With local mode, a single Java process starts the JobCoordinator, > creates a single container, and executes it locally. All partitions are > procesed within this container. With YARN, a YARN grid is required to > execute the Samza job. In addition, SAMZA-375 introduces a patch to run Samza > in Mesos. > There have been several requests lately to be able to run Samza jobs without > any resource manager (YARN, Mesos, etc), but still run it in a distributed > fashion. > The goal of this ticket is to design and implement a samza-standalone module, > which will: > # Support executing a single Samza job in one or more containers. > # Support failover, in cases where a machine is lost. -- This message was sent by Atlassian JIRA (v6.3.4#6332)