[ 
https://issues.apache.org/jira/browse/SAMZA-516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14286772#comment-14286772
 ] 

Chris Riccomini commented on SAMZA-516:
---------------------------------------

My current line of thinking is to write a StreamJob implementation that 
orchestrates jobs via ZooKeeper. The basic flow would look something like this:

h3. Starting a single process.

# Run {{run-job.sh}} on machine-1.
# {{run-job.sh}} uses the supplied job config to instantiate 
StandaloneJobFactory, and gets a StandaloneJob.
# StandaloneJob connects to ZK.
# StandaloneJob adds itself as a process in the {{job group}} ZK directory 
using an ephemeral node.
# StandaloneJob sets itself as a watcher in the {{job group}} container 
assignment directory.
## When StandaloneJob receives container assignments, it creates a new thread, 
and instantiates a new SamzaContainer on the thread.
## When StandaloneJob loses a container assignment, it interrupts the 
appropriate thread, which shuts down the SamzaContainer.
# StandaloneJob checks if there is a JobCoordinator leader elected.
# If there is no leader elected, StandaloneJob tries to elect itself as leader, 
and assigns containers to processes in the {{job group}}.
## Check for the number of connected processes in the {{job group}}.
## Use job.container.count to create a JobCoordinator with containerCount set 
accordingly.
## Assign containers equally among all processes in the {{job group}}.
## Set a watcher on the {{job group}} ZK directory.
# SamzaContainer will then query the JobCoordinator via its HTTP API to begin 
starting the SamzaContainer.

h3. Starting a second process.

# All of the above steps are repeated. The second {{run-job.sh}} command could 
be run on machine-1, or some other machine.
# A leader is already elected, so no leadership election happens for the 
JobCoordinator.
# When this process registers itself in ZK (step (4), above), the watcher set 
in step (7.4) will be notified. 
# Watcher set in (7.4) will re-assign partitions from the first process to the 
second.
# When reassignment happens, watcher set in (5) will trigger the start of new 
containers in this process.

h3. Handling failover.

# The first process fails.
# Its ephemeral node times out of ZooKeeper.
# The second process is notified via ZK that the process in the {{job group}} 
that was the owner of the JobCoordinator has failed.
# The second process elects itself as JobCoordinator leader.
# The new JobCoordinator diffs the list of processes in the {{job group}} with 
the container assignments.
# Any containers that were assigned to the dead processes are shifted to live 
processes. This shift triggers the containers to start via the watcher defined 
in step (5.1).

h3. Notes

* The StandaloneJob could use processes instead of threads to run containers. 
This would allow task.opts to be set on SamzaContainers, and would also keep 
container code fully isolated from the JobCoordinator, and StandaloneJob ZK 
logic.
* The process:container:stream task:partition nomenclature is pretty 
cumbersome. We're also adding a new concept here: something above the 
container. Currently, in YARN, the YARN container to SamzaContainer mapping is 
1:1. The approach described above breaks this model, and makes it 1:*. The 
benefit of this is that it means we can shift SamzaContainers amongst Java 
processes without stopping existing containers. It also means we don't have to 
change any SamzaContainer code--everything in there can remain immutable.
* When we move the AM UI out of YARN and into JobCoordinator, you'll be able to 
use the UI in standalone mode, but the UI will jump from node to node as 
machines fail. This seems kind of annoying.
* Does it make sense to have the JobCoordinator run as an independent process? 
When it's down, existing containers continue to run, but no new containers can 
be added (I think).
* This design doesn't support multi-job queries. For example, if you wanted to 
run "SELECT member\_id, COUNT(*) FROM stream GROUP BY member\_id", you 
basically can't, since this query requires running two jobs (one to 
repartition, and one to aggregate).
* I haven't worked through the details on whether we could end up with orphaned 
processes that continue producing even after the coordinator thinks they're 
failed. This concern was voiced on the mailing list. If we can't safely rely on 
ZK to notify us when a container is ACTUALLY dead, then we could end up in 
split-brain scenarios, where we have multiple containers processing the same 
data.

> Support standalone Samza jobs
> -----------------------------
>
>                 Key: SAMZA-516
>                 URL: https://issues.apache.org/jira/browse/SAMZA-516
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.9.0
>            Reporter: Chris Riccomini
>            Assignee: Chris Riccomini
>
> Samza currently supports two modes of operation out of the box: local and 
> YARN. With local mode, a single Java process starts the JobCoordinator, 
> creates a single container, and executes it locally. All partitions are 
> procesed within this container.  With YARN, a YARN grid is required to 
> execute the Samza job. In addition, SAMZA-375 introduces a patch to run Samza 
> in Mesos.
> There have been several requests lately to be able to run Samza jobs without 
> any resource manager (YARN, Mesos, etc), but still run it in a distributed 
> fashion.
> The goal of this ticket is to design and implement a samza-standalone module, 
> which will:
> # Support executing a single Samza job in one or more containers.
> # Support failover, in cases where a machine is lost.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to