Hey all,

Also, just opened this ticket to track work on samza-standalone:

  https://issues.apache.org/jira/browse/SAMZA-516

Cheers,
Chris

On 1/21/15 1:32 PM, "Chris Riccomini" <[email protected]> wrote:

>Hey Jae,
>
>> So, we need to find out running Samza on Mesos won't create that
>>problem, or Spark Streaming won't have that issue. In the worst case,
>>creating our own distribution coordination might be more predictable
>>instead of running Yarn on EMR.
>
>I think that there are two ways to fix this. One is to have the Kafka
>broker detect that there are two producers that are "the same", and start
>dropping messages from the "old one" (and perhaps throw an exception to
>the old producer). The other way is to have the Samza container detect the
>problem, and kill itself.
>
>The kafka-based approach is a subset of the transactionality feature
>described here:
>
>  
>https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+
>i
>n+Kafka
>
>The problem with the Kafka approach is that 1) it's kafka-specific, and 2)
>the generation id required to drop messages from an orphaned producer
>hasn't been implemented, except in a branch that's not been committed.
>
>So, if we accept that we shouldn't use Kafka as the solution for detecting
>orphaned containers, the solution will have to go into Samza. Within
>Samza, there are two approaches. One is to use the resource scheduler
>(YARN, Mesos, etc.) to detect the problem. The other solution is to use
>Samza, itself, to detect the problem.
>
>A YARN-specific example of how to solve the problem would be to have the
>SamzaContainer periodically poll its local NM's REST endpoint:
>
>  http://eat1-app1218.corp.linkedin.com:8042/ws/v1/node/info
>
>To see what the status is, its last update time, etc. If the REST endpoint
>can't be reached, the node is unhealthy, or the last update time is > some
>time interval, the container could kill itself. Again, this is
>YARN-specific.
>
>I am not sure how Mesos handles split-brain. I've asked Tim Chen on
>SAMZA-375:
>
>  
>https://issues.apache.org/jira/browse/SAMZA-375?focusedCommentId=14286204&;
>p
>age=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comme
>n
>t-14286204
>
>The last solution that I mentioned, using Samza directly (no dependency on
>Kafka, YARN, Mesos, etc), seems like the best long-term solution to me. We
>can either 1) introduce a heartbeat message into the coordinator stream,
>or 2) use the existing checkpoint message as a heartbeat.  There is some
>complexity to this solution that would need to be thought through, though.
>For example, should the heartbeat messages be sent from the main thread?
>What happens if the main thread is blocked on process() for an extended
>period of time?
>
>What do others think? As a short-term fix, it seems to me like YARN/Mesos
>should handle this automatically for us. Has anyone had experience with
>orphaned containers in Mesos?
>
>> I really appreciate if you give me some guideline about implementing
>>custom cluster management interface of Samza.
>
>Samza jobs are started through bin/run-job.sh (inside samza-shell). This
>CLI uses JobRunner to instantiate a StreamJobFactory (defined with
>job.factory.class), which returns a StreamJob. To implement your own
>cluster management, the first thing you'll need to do is implement
>StreamJobFactory and StreamJob. You can have a look at YarnJob or
>ProcessJob/ProcessJobFactory for an example of how to do this.
>
>Note that this code has changed slightly between 0.8.0 and master (0.9.0).
>In 0.9.0, the partition-to-container assignment logic has been pulled out
>of YARN's AM, and into a JobCoordinator class.
>
>The trick with adding EC2 ASG is going to be in handling partition
>shifting when a new node is added to the group. For example, if you have
>two machines, each running one container, and you add a third machine,
>some of the input partitions (and corresponding StreamTasks) need to be
>shifted from the two machines on to the third. The only way to do this
>right now is to:
>
>1. Stop all containers.
>2. Re-instantiate the JobCoordinator with a new container count.
>3. Start new containers on all three machines with the new partition
>assignments.
>
>In an ideal world, steps (1-3) would be handled automatically by Samza,
>and wouldn't require container restarts. This is precisely what
>samza-standalone will accomplish. If you're interested in contributing to
>samza-standalone, that would be awesome. I'm working on a design doc right
>now, which I'm trying to post by EOW. Once that's done, we can collaborate
>on design and split the code up, if you'd like.
>
>
>Cheers,
>Chris
>
>On 1/21/15 1:14 PM, "Bae, Jae Hyeon" <[email protected]> wrote:
>
>>Hi Samza Devs
>>
>>The significant concern I got recently is, container leak. The data
>>pipeline based on Samza can guarantee at least once delivery but the
>>duplicate rate is over 1.0%, I am having alerts right now. Container
>>leaks
>>will push a lot of alerts to me.
>>
>>So, we need to find out running Samza on Mesos won't create that problem,
>>or Spark Streaming won't have that issue. In the worst case, creating our
>>own distribution coordination might be more predictable instead of
>>running
>>Yarn on EMR.
>>
>>What about standalone Samza? If this is quite plausible and the best
>>solution in the near future, I want to be able to contribute. Could you
>>share your thoughts or plans?
>>
>>I really appreciate if you give me some guideline about implementing
>>custom
>>cluster management interface of Samza. If it's possible, I want to take a
>>look to replace Yarn support with EC2 ASG stuff.
>>
>>Thank you
>>Best, Jae
>

Reply via email to