I read through SAMZA-375. We will do one more round PoC Samza on Mesos. On Thu, Jan 22, 2015 at 11:14 AM, Bae, Jae Hyeon <[email protected]> wrote:
> I asked Mantis guy about orphaned container in Mesos and he was almost > sure that Mesos won't let that happen. > > How is https://issues.apache.org/jira/browse/SAMZA-375 going? Depending > on the time line among stabilizing stand alone and Mesos support, our > schedule or decision will be changed. > > Thank you > Best, Jae > > On Wed, Jan 21, 2015 at 4:58 PM, Chris Riccomini < > [email protected]> wrote: > >> Hey all, >> >> Also, just opened this ticket to track work on samza-standalone: >> >> https://issues.apache.org/jira/browse/SAMZA-516 >> >> Cheers, >> Chris >> >> On 1/21/15 1:32 PM, "Chris Riccomini" <[email protected]> wrote: >> >> >Hey Jae, >> > >> >> So, we need to find out running Samza on Mesos won't create that >> >>problem, or Spark Streaming won't have that issue. In the worst case, >> >>creating our own distribution coordination might be more predictable >> >>instead of running Yarn on EMR. >> > >> >I think that there are two ways to fix this. One is to have the Kafka >> >broker detect that there are two producers that are "the same", and start >> >dropping messages from the "old one" (and perhaps throw an exception to >> >the old producer). The other way is to have the Samza container detect >> the >> >problem, and kill itself. >> > >> >The kafka-based approach is a subset of the transactionality feature >> >described here: >> > >> > >> > >> https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+ >> >i >> >n+Kafka >> > >> >The problem with the Kafka approach is that 1) it's kafka-specific, and >> 2) >> >the generation id required to drop messages from an orphaned producer >> >hasn't been implemented, except in a branch that's not been committed. >> > >> >So, if we accept that we shouldn't use Kafka as the solution for >> detecting >> >orphaned containers, the solution will have to go into Samza. Within >> >Samza, there are two approaches. One is to use the resource scheduler >> >(YARN, Mesos, etc.) to detect the problem. The other solution is to use >> >Samza, itself, to detect the problem. >> > >> >A YARN-specific example of how to solve the problem would be to have the >> >SamzaContainer periodically poll its local NM's REST endpoint: >> > >> > http://eat1-app1218.corp.linkedin.com:8042/ws/v1/node/info >> > >> >To see what the status is, its last update time, etc. If the REST >> endpoint >> >can't be reached, the node is unhealthy, or the last update time is > >> some >> >time interval, the container could kill itself. Again, this is >> >YARN-specific. >> > >> >I am not sure how Mesos handles split-brain. I've asked Tim Chen on >> >SAMZA-375: >> > >> > >> > >> https://issues.apache.org/jira/browse/SAMZA-375?focusedCommentId=14286204& >> >p >> >> >age=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comme >> >n >> >t-14286204 >> > >> >The last solution that I mentioned, using Samza directly (no dependency >> on >> >Kafka, YARN, Mesos, etc), seems like the best long-term solution to me. >> We >> >can either 1) introduce a heartbeat message into the coordinator stream, >> >or 2) use the existing checkpoint message as a heartbeat. There is some >> >complexity to this solution that would need to be thought through, >> though. >> >For example, should the heartbeat messages be sent from the main thread? >> >What happens if the main thread is blocked on process() for an extended >> >period of time? >> > >> >What do others think? As a short-term fix, it seems to me like YARN/Mesos >> >should handle this automatically for us. Has anyone had experience with >> >orphaned containers in Mesos? >> > >> >> I really appreciate if you give me some guideline about implementing >> >>custom cluster management interface of Samza. >> > >> >Samza jobs are started through bin/run-job.sh (inside samza-shell). This >> >CLI uses JobRunner to instantiate a StreamJobFactory (defined with >> >job.factory.class), which returns a StreamJob. To implement your own >> >cluster management, the first thing you'll need to do is implement >> >StreamJobFactory and StreamJob. You can have a look at YarnJob or >> >ProcessJob/ProcessJobFactory for an example of how to do this. >> > >> >Note that this code has changed slightly between 0.8.0 and master >> (0.9.0). >> >In 0.9.0, the partition-to-container assignment logic has been pulled out >> >of YARN's AM, and into a JobCoordinator class. >> > >> >The trick with adding EC2 ASG is going to be in handling partition >> >shifting when a new node is added to the group. For example, if you have >> >two machines, each running one container, and you add a third machine, >> >some of the input partitions (and corresponding StreamTasks) need to be >> >shifted from the two machines on to the third. The only way to do this >> >right now is to: >> > >> >1. Stop all containers. >> >2. Re-instantiate the JobCoordinator with a new container count. >> >3. Start new containers on all three machines with the new partition >> >assignments. >> > >> >In an ideal world, steps (1-3) would be handled automatically by Samza, >> >and wouldn't require container restarts. This is precisely what >> >samza-standalone will accomplish. If you're interested in contributing to >> >samza-standalone, that would be awesome. I'm working on a design doc >> right >> >now, which I'm trying to post by EOW. Once that's done, we can >> collaborate >> >on design and split the code up, if you'd like. >> > >> > >> >Cheers, >> >Chris >> > >> >On 1/21/15 1:14 PM, "Bae, Jae Hyeon" <[email protected]> wrote: >> > >> >>Hi Samza Devs >> >> >> >>The significant concern I got recently is, container leak. The data >> >>pipeline based on Samza can guarantee at least once delivery but the >> >>duplicate rate is over 1.0%, I am having alerts right now. Container >> >>leaks >> >>will push a lot of alerts to me. >> >> >> >>So, we need to find out running Samza on Mesos won't create that >> problem, >> >>or Spark Streaming won't have that issue. In the worst case, creating >> our >> >>own distribution coordination might be more predictable instead of >> >>running >> >>Yarn on EMR. >> >> >> >>What about standalone Samza? If this is quite plausible and the best >> >>solution in the near future, I want to be able to contribute. Could you >> >>share your thoughts or plans? >> >> >> >>I really appreciate if you give me some guideline about implementing >> >>custom >> >>cluster management interface of Samza. If it's possible, I want to take >> a >> >>look to replace Yarn support with EC2 ASG stuff. >> >> >> >>Thank you >> >>Best, Jae >> > >> >> >
