Hey Jae,

> So, we need to find out running Samza on Mesos won't create that
>problem, or Spark Streaming won't have that issue. In the worst case,
>creating our own distribution coordination might be more predictable
>instead of running Yarn on EMR.

I think that there are two ways to fix this. One is to have the Kafka
broker detect that there are two producers that are "the same", and start
dropping messages from the "old one" (and perhaps throw an exception to
the old producer). The other way is to have the Samza container detect the
problem, and kill itself.

The kafka-based approach is a subset of the transactionality feature
described here:

  
https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+i
n+Kafka

The problem with the Kafka approach is that 1) it's kafka-specific, and 2)
the generation id required to drop messages from an orphaned producer
hasn't been implemented, except in a branch that's not been committed.

So, if we accept that we shouldn't use Kafka as the solution for detecting
orphaned containers, the solution will have to go into Samza. Within
Samza, there are two approaches. One is to use the resource scheduler
(YARN, Mesos, etc.) to detect the problem. The other solution is to use
Samza, itself, to detect the problem.

A YARN-specific example of how to solve the problem would be to have the
SamzaContainer periodically poll its local NM's REST endpoint:

  http://eat1-app1218.corp.linkedin.com:8042/ws/v1/node/info

To see what the status is, its last update time, etc. If the REST endpoint
can't be reached, the node is unhealthy, or the last update time is > some
time interval, the container could kill itself. Again, this is
YARN-specific.

I am not sure how Mesos handles split-brain. I've asked Tim Chen on
SAMZA-375:

  
https://issues.apache.org/jira/browse/SAMZA-375?focusedCommentId=14286204&p
age=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#commen
t-14286204

The last solution that I mentioned, using Samza directly (no dependency on
Kafka, YARN, Mesos, etc), seems like the best long-term solution to me. We
can either 1) introduce a heartbeat message into the coordinator stream,
or 2) use the existing checkpoint message as a heartbeat.  There is some
complexity to this solution that would need to be thought through, though.
For example, should the heartbeat messages be sent from the main thread?
What happens if the main thread is blocked on process() for an extended
period of time?

What do others think? As a short-term fix, it seems to me like YARN/Mesos
should handle this automatically for us. Has anyone had experience with
orphaned containers in Mesos?

> I really appreciate if you give me some guideline about implementing
>custom cluster management interface of Samza.

Samza jobs are started through bin/run-job.sh (inside samza-shell). This
CLI uses JobRunner to instantiate a StreamJobFactory (defined with
job.factory.class), which returns a StreamJob. To implement your own
cluster management, the first thing you'll need to do is implement
StreamJobFactory and StreamJob. You can have a look at YarnJob or
ProcessJob/ProcessJobFactory for an example of how to do this.

Note that this code has changed slightly between 0.8.0 and master (0.9.0).
In 0.9.0, the partition-to-container assignment logic has been pulled out
of YARN's AM, and into a JobCoordinator class.

The trick with adding EC2 ASG is going to be in handling partition
shifting when a new node is added to the group. For example, if you have
two machines, each running one container, and you add a third machine,
some of the input partitions (and corresponding StreamTasks) need to be
shifted from the two machines on to the third. The only way to do this
right now is to:

1. Stop all containers.
2. Re-instantiate the JobCoordinator with a new container count.
3. Start new containers on all three machines with the new partition
assignments.

In an ideal world, steps (1-3) would be handled automatically by Samza,
and wouldn't require container restarts. This is precisely what
samza-standalone will accomplish. If you're interested in contributing to
samza-standalone, that would be awesome. I'm working on a design doc right
now, which I'm trying to post by EOW. Once that's done, we can collaborate
on design and split the code up, if you'd like.


Cheers,
Chris

On 1/21/15 1:14 PM, "Bae, Jae Hyeon" <[email protected]> wrote:

>Hi Samza Devs
>
>The significant concern I got recently is, container leak. The data
>pipeline based on Samza can guarantee at least once delivery but the
>duplicate rate is over 1.0%, I am having alerts right now. Container leaks
>will push a lot of alerts to me.
>
>So, we need to find out running Samza on Mesos won't create that problem,
>or Spark Streaming won't have that issue. In the worst case, creating our
>own distribution coordination might be more predictable instead of running
>Yarn on EMR.
>
>What about standalone Samza? If this is quite plausible and the best
>solution in the near future, I want to be able to contribute. Could you
>share your thoughts or plans?
>
>I really appreciate if you give me some guideline about implementing
>custom
>cluster management interface of Samza. If it's possible, I want to take a
>look to replace Yarn support with EC2 ASG stuff.
>
>Thank you
>Best, Jae

Reply via email to