Hey all, Also, just opened this ticket to track work on samza-standalone:
https://issues.apache.org/jira/browse/SAMZA-516 Cheers, Chris On 1/21/15 1:32 PM, "Chris Riccomini" <[email protected]> wrote: >Hey Jae, > >> So, we need to find out running Samza on Mesos won't create that >>problem, or Spark Streaming won't have that issue. In the worst case, >>creating our own distribution coordination might be more predictable >>instead of running Yarn on EMR. > >I think that there are two ways to fix this. One is to have the Kafka >broker detect that there are two producers that are "the same", and start >dropping messages from the "old one" (and perhaps throw an exception to >the old producer). The other way is to have the Samza container detect the >problem, and kill itself. > >The kafka-based approach is a subset of the transactionality feature >described here: > > >https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+ >i >n+Kafka > >The problem with the Kafka approach is that 1) it's kafka-specific, and 2) >the generation id required to drop messages from an orphaned producer >hasn't been implemented, except in a branch that's not been committed. > >So, if we accept that we shouldn't use Kafka as the solution for detecting >orphaned containers, the solution will have to go into Samza. Within >Samza, there are two approaches. One is to use the resource scheduler >(YARN, Mesos, etc.) to detect the problem. The other solution is to use >Samza, itself, to detect the problem. > >A YARN-specific example of how to solve the problem would be to have the >SamzaContainer periodically poll its local NM's REST endpoint: > > http://eat1-app1218.corp.linkedin.com:8042/ws/v1/node/info > >To see what the status is, its last update time, etc. If the REST endpoint >can't be reached, the node is unhealthy, or the last update time is > some >time interval, the container could kill itself. Again, this is >YARN-specific. > >I am not sure how Mesos handles split-brain. I've asked Tim Chen on >SAMZA-375: > > >https://issues.apache.org/jira/browse/SAMZA-375?focusedCommentId=14286204& >p >age=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comme >n >t-14286204 > >The last solution that I mentioned, using Samza directly (no dependency on >Kafka, YARN, Mesos, etc), seems like the best long-term solution to me. We >can either 1) introduce a heartbeat message into the coordinator stream, >or 2) use the existing checkpoint message as a heartbeat. There is some >complexity to this solution that would need to be thought through, though. >For example, should the heartbeat messages be sent from the main thread? >What happens if the main thread is blocked on process() for an extended >period of time? > >What do others think? As a short-term fix, it seems to me like YARN/Mesos >should handle this automatically for us. Has anyone had experience with >orphaned containers in Mesos? > >> I really appreciate if you give me some guideline about implementing >>custom cluster management interface of Samza. > >Samza jobs are started through bin/run-job.sh (inside samza-shell). This >CLI uses JobRunner to instantiate a StreamJobFactory (defined with >job.factory.class), which returns a StreamJob. To implement your own >cluster management, the first thing you'll need to do is implement >StreamJobFactory and StreamJob. You can have a look at YarnJob or >ProcessJob/ProcessJobFactory for an example of how to do this. > >Note that this code has changed slightly between 0.8.0 and master (0.9.0). >In 0.9.0, the partition-to-container assignment logic has been pulled out >of YARN's AM, and into a JobCoordinator class. > >The trick with adding EC2 ASG is going to be in handling partition >shifting when a new node is added to the group. For example, if you have >two machines, each running one container, and you add a third machine, >some of the input partitions (and corresponding StreamTasks) need to be >shifted from the two machines on to the third. The only way to do this >right now is to: > >1. Stop all containers. >2. Re-instantiate the JobCoordinator with a new container count. >3. Start new containers on all three machines with the new partition >assignments. > >In an ideal world, steps (1-3) would be handled automatically by Samza, >and wouldn't require container restarts. This is precisely what >samza-standalone will accomplish. If you're interested in contributing to >samza-standalone, that would be awesome. I'm working on a design doc right >now, which I'm trying to post by EOW. Once that's done, we can collaborate >on design and split the code up, if you'd like. > > >Cheers, >Chris > >On 1/21/15 1:14 PM, "Bae, Jae Hyeon" <[email protected]> wrote: > >>Hi Samza Devs >> >>The significant concern I got recently is, container leak. The data >>pipeline based on Samza can guarantee at least once delivery but the >>duplicate rate is over 1.0%, I am having alerts right now. Container >>leaks >>will push a lot of alerts to me. >> >>So, we need to find out running Samza on Mesos won't create that problem, >>or Spark Streaming won't have that issue. In the worst case, creating our >>own distribution coordination might be more predictable instead of >>running >>Yarn on EMR. >> >>What about standalone Samza? If this is quite plausible and the best >>solution in the near future, I want to be able to contribute. Could you >>share your thoughts or plans? >> >>I really appreciate if you give me some guideline about implementing >>custom >>cluster management interface of Samza. If it's possible, I want to take a >>look to replace Yarn support with EC2 ASG stuff. >> >>Thank you >>Best, Jae >
