[ 
https://issues.apache.org/jira/browse/SAMZA-316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14057097#comment-14057097
 ] 

Chris Riccomini commented on SAMZA-316:
---------------------------------------

bq. How is a client request routed to the right container? (Assuming Samza is 
running in a YARN cluster, we have no way of knowing the IP address and port 
number of a container a priori. Also, the endpoint may change as containers 
fail and are restarted.)

This could be discovered by talking to the job's AM. The AM will know which 
containers 

bq. For (2) I propose the following: a network service is implemented as just 
another system (i.e. implementing SystemFactory) in Samza. Every incoming 
request from a client is wrapped in an IncomingMessageEnvelope, and goes 
through the same MessageChooser flow as everything else. This ensures that 
StreamTask remains single-threaded and easy to reason about.

Interesting. This is really creative. I like that it isolates this feature 
outside of samza-core, so people that don't use it don't end up pulling in a 
ton of Jetty dependencies (or whatever). I have been really reluctant to add an 
HTTP server to SamzaContainer or TaskInstances because I think it clutters 
everything up. This is a really nice way to get the functionality without 
cluttering anything. In fact, I think no changes need to be made to Samza, 
right?

bq. If a container is shut down, all of its clients will be disconnected and 
will need to reconnect. I don't think we need to worry about preserving 
connections across container restarts.

Agree.

bq. Point (1) is a service discovery problem

Agree. Regardless of which solution is picked, it can, again, be isolated from 
the rest of Samza by having the SystemFactory's consumer register. I've been 
reluctant to add a direct ZK dependency to SamzaContainer/TaskInstance, and 
have thus far avoided it (we have indirect dependencies through Kafka, when 
it's used).

I have shied away from this feature because I couldn't figure out a way to do 
it that wasn't intrusive to the code base. Your proposal seems like a really 
clean way to isolate this feature in its own package and module away from 
everything else.

> Allow jobs to expose network service endpoints
> ----------------------------------------------
>
>                 Key: SAMZA-316
>                 URL: https://issues.apache.org/jira/browse/SAMZA-316
>             Project: Samza
>          Issue Type: New Feature
>          Components: container
>    Affects Versions: 0.7.0
>            Reporter: Martin Kleppmann
>
> At the moment, the only way of contacting a task instance in a Samza job is 
> to send a message to one of the job's input streams. Sometimes it would be 
> useful to be able to contact a task instance directly, as a TCP network 
> service. Example use cases:
> - Allowing a remote client to query a Samza job's state store (which may, in 
> some cases, obviate the need for writing job output into a separate, publicly 
> readable database).
> - Allowing a client to "tap into" a stream without consuming the entire 
> stream. For example, a client may wish to be notified about all log messages 
> matching a particular regular expression as long as they are connected (the 
> notifications stop when they disconnect).
> - Performing an expensive on-demand computation quickly by parallelizing it 
> across many tasks in one or more Samza jobs, like in Storm's [distributed 
> RPC|https://storm.incubator.apache.org/documentation/Distributed-RPC.html].
> These use cases can be implemented by running a network server (e.g. a HTTP 
> server, a WebSocket server, a Thrift RPC server, etc) in each Samza 
> container. We then need to solve two problems:
> # How is a client request routed to the right container? (Assuming Samza is 
> running in a YARN cluster, we have no way of knowing the IP address and port 
> number of a container a priori. Also, the endpoint may change as containers 
> fail and are restarted.)
> # How are requests from remote clients integrated with Samza's StreamTask 
> programming model? (Requests may arrive at any time; responses may be 
> delivered immediately or asynchronously at some later time; depending on the 
> protocol, there may be a stream of requests and responses on a single client 
> connection.)
> Point (1) is a service discovery problem, for which there are many possible 
> solutions, but no one obvious winner. [Helix|http://helix.apache.org/] could 
> be used for this (although Helix also provides various cluster management 
> features that we probably wouldn't need). YARN is considering integrating a 
> service discovery mechanism (YARN-913). [Finagle 
> ServerSets|http://stevenskelton.ca/finagle-serverset-clusters-using-zookeeper/]
>  and [Rest.li D2|https://github.com/linkedin/rest.li/wiki/Dynamic-Discovery] 
> both use Zookeeper for service discovery. Looking beyond the end of our 
> JVM-based nose, projects like [Consul|http://www.consul.io/], 
> [SkyDNS|http://blog.gopheracademy.com/skydns] and 
> [etcd|https://github.com/coreos/etcd] also provide service discovery. It 
> would be worth surveying the landscape and figuring out the various pros and 
> cons before settling on one particular service discovery mechanism. In 
> particular, we should keep in mind the needs of clients that are not written 
> in a JVM-based language.
> Whatever service discovery solution is chosen, we will need to decide whether 
> to use a separate TCP port for each TaskInstance within a container, or 
> whether to use some application-level mechanism for deciding which 
> TaskInstance should process a particular incoming request.
> For (2) I propose the following: a network service is implemented as just 
> another system (i.e. implementing SystemFactory) in Samza. Every incoming 
> request from a client is wrapped in an IncomingMessageEnvelope, and goes 
> through the same MessageChooser flow as everything else. This ensures that 
> StreamTask remains single-threaded and easy to reason about.
> Each connection from a client is given a unique stream name. This allows the 
> StreamTask to tell which requests came from the same client. In order to send 
> a response to a client, the StreamTask sends an OutgoingMessageEnvelope to an 
> output stream, using the same system and stream name as the incoming request. 
> This means that a StreamTask can generate a response immediately if it wants 
> to, or it could send a response asynchronously at some later point in time. 
> It also works for protocols that can have many requests and responses on a 
> long-lived connection, e.g. WebSocket.
> Special incoming messages can be used to indicate that a client has connected 
> or disconnected (allowing cleanup of any per-client information in the 
> StreamTask), and a special outgoing message can be used to tell the network 
> service to disconnect a particular client.
> If a container is shut down, all of its clients will be disconnected and will 
> need to reconnect. I don't think we need to worry about preserving 
> connections across container restarts.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to