Hey Jae, > So, we need to find out running Samza on Mesos won't create that >problem, or Spark Streaming won't have that issue. In the worst case, >creating our own distribution coordination might be more predictable >instead of running Yarn on EMR.
I think that there are two ways to fix this. One is to have the Kafka broker detect that there are two producers that are "the same", and start dropping messages from the "old one" (and perhaps throw an exception to the old producer). The other way is to have the Samza container detect the problem, and kill itself. The kafka-based approach is a subset of the transactionality feature described here: https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+i n+Kafka The problem with the Kafka approach is that 1) it's kafka-specific, and 2) the generation id required to drop messages from an orphaned producer hasn't been implemented, except in a branch that's not been committed. So, if we accept that we shouldn't use Kafka as the solution for detecting orphaned containers, the solution will have to go into Samza. Within Samza, there are two approaches. One is to use the resource scheduler (YARN, Mesos, etc.) to detect the problem. The other solution is to use Samza, itself, to detect the problem. A YARN-specific example of how to solve the problem would be to have the SamzaContainer periodically poll its local NM's REST endpoint: http://eat1-app1218.corp.linkedin.com:8042/ws/v1/node/info To see what the status is, its last update time, etc. If the REST endpoint can't be reached, the node is unhealthy, or the last update time is > some time interval, the container could kill itself. Again, this is YARN-specific. I am not sure how Mesos handles split-brain. I've asked Tim Chen on SAMZA-375: https://issues.apache.org/jira/browse/SAMZA-375?focusedCommentId=14286204&p age=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#commen t-14286204 The last solution that I mentioned, using Samza directly (no dependency on Kafka, YARN, Mesos, etc), seems like the best long-term solution to me. We can either 1) introduce a heartbeat message into the coordinator stream, or 2) use the existing checkpoint message as a heartbeat. There is some complexity to this solution that would need to be thought through, though. For example, should the heartbeat messages be sent from the main thread? What happens if the main thread is blocked on process() for an extended period of time? What do others think? As a short-term fix, it seems to me like YARN/Mesos should handle this automatically for us. Has anyone had experience with orphaned containers in Mesos? > I really appreciate if you give me some guideline about implementing >custom cluster management interface of Samza. Samza jobs are started through bin/run-job.sh (inside samza-shell). This CLI uses JobRunner to instantiate a StreamJobFactory (defined with job.factory.class), which returns a StreamJob. To implement your own cluster management, the first thing you'll need to do is implement StreamJobFactory and StreamJob. You can have a look at YarnJob or ProcessJob/ProcessJobFactory for an example of how to do this. Note that this code has changed slightly between 0.8.0 and master (0.9.0). In 0.9.0, the partition-to-container assignment logic has been pulled out of YARN's AM, and into a JobCoordinator class. The trick with adding EC2 ASG is going to be in handling partition shifting when a new node is added to the group. For example, if you have two machines, each running one container, and you add a third machine, some of the input partitions (and corresponding StreamTasks) need to be shifted from the two machines on to the third. The only way to do this right now is to: 1. Stop all containers. 2. Re-instantiate the JobCoordinator with a new container count. 3. Start new containers on all three machines with the new partition assignments. In an ideal world, steps (1-3) would be handled automatically by Samza, and wouldn't require container restarts. This is precisely what samza-standalone will accomplish. If you're interested in contributing to samza-standalone, that would be awesome. I'm working on a design doc right now, which I'm trying to post by EOW. Once that's done, we can collaborate on design and split the code up, if you'd like. Cheers, Chris On 1/21/15 1:14 PM, "Bae, Jae Hyeon" <[email protected]> wrote: >Hi Samza Devs > >The significant concern I got recently is, container leak. The data >pipeline based on Samza can guarantee at least once delivery but the >duplicate rate is over 1.0%, I am having alerts right now. Container leaks >will push a lot of alerts to me. > >So, we need to find out running Samza on Mesos won't create that problem, >or Spark Streaming won't have that issue. In the worst case, creating our >own distribution coordination might be more predictable instead of running >Yarn on EMR. > >What about standalone Samza? If this is quite plausible and the best >solution in the near future, I want to be able to contribute. Could you >share your thoughts or plans? > >I really appreciate if you give me some guideline about implementing >custom >cluster management interface of Samza. If it's possible, I want to take a >look to replace Yarn support with EC2 ASG stuff. > >Thank you >Best, Jae
