Hey Jae, No problem. This is actually an interesting study. :) Please post your findings with Mesos.
Cheers, Chris On Thu, Jan 22, 2015 at 1:53 PM, Bae, Jae Hyeon <[email protected]> wrote: > Thank you so much. This was really helpful! > > On Thu, Jan 22, 2015 at 1:04 PM, Chris Riccomini <[email protected]> > wrote: > >> Hey Jae, >> >> Here are my results from testing failure scenario (1), above. I started >> hello-samza, ran a job, and then killed the RM. The NM hung around for a >> while, and then killed the orphaned containers, and itself: >> >> NM trying to reconnect to dead (or partitioned) RM: >> >> criccomi-mn:incubator-samza-hello-samza criccomi$ jps >> 1650 SamzaAppMaster >> 1350 Kafka >> 1687 SamzaContainer >> 1321 NodeManager >> 461 >> 1902 Jps >> 1247 QuorumPeerMain >> >> NM decides to kill all of its containers, and itself: >> >> criccomi-mn:incubator-samza-hello-samza criccomi$ jps >> 1925 Jps >> 1350 Kafka >> 461 >> 1247 QuorumPeerMain >> >> Here are the logs from the NM after killing the RM: >> >> 2015-01-22 12:33:22,611 INFO org.apache.hadoop.ipc.Client: Retrying >> connect >> to server: localhost/127.0.0.1:8031. Already tried 0 time(s); retry >> policy >> is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 >> MILLISECONDS) >> 2015-01-22 12:33:23,612 INFO org.apache.hadoop.ipc.Client: Retrying >> connect >> to server: localhost/127.0.0.1:8031. Already tried 1 time(s); retry >> policy >> is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 >> MILLISECONDS) >> 2015-01-22 12:33:24,613 INFO org.apache.hadoop.ipc.Client: Retrying >> connect >> to server: localhost/127.0.0.1:8031. Already tried 2 time(s); retry >> policy >> is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 >> MILLISECONDS) >> 2015-01-22 12:33:25,615 INFO org.apache.hadoop.ipc.Client: Retrying >> connect >> to server: localhost/127.0.0.1:8031. Already tried 3 time(s); retry >> policy >> is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000 >> MILLISECONDS) >> .... >> 2015-01-22 12:52:50,096 INFO org.apache.hadoop.ipc.Client: Retrying >> connect >> to server: localhost/127.0.0.1:8031. Already tried 7 time(s); retry >> policy >> is RetryUpToMaximumCountWithFixedSleep(maxRet >> ries=10, sleepTime=1000 MILLISECONDS) >> 2015-01-22 12:52:51,097 INFO org.apache.hadoop.ipc.Client: Retrying >> connect >> to server: localhost/127.0.0.1:8031. Already tried 8 time(s); retry >> policy >> is RetryUpToMaximumCountWithFixedSleep(maxRet >> ries=10, sleepTime=1000 MILLISECONDS) >> 2015-01-22 12:52:52,098 INFO org.apache.hadoop.ipc.Client: Retrying >> connect >> to server: localhost/127.0.0.1:8031. Already tried 9 time(s); retry >> policy >> is RetryUpToMaximumCountWithFixedSleep(maxRet >> ries=10, sleepTime=1000 MILLISECONDS) >> >> You can see that the NM ran for 20 minutes. I believe this is tunable with >> configs in: >> >> >> >> https://hadoop.apache.org/docs/r2.4.0/hadoop-yarn/hadoop-yarn-common/yarn-default.xml >> >> Once the final timeout happens, the NM shuts all containers down, and >> kills >> itself: >> >> 2015-01-22 12:52:52,217 INFO >> >> org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl: >> Applications still running : [application_1421958559415_0001] >> 2015-01-22 12:52:52,219 INFO >> >> org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl: >> Waiting for Applications to be Finished >> 2015-01-22 12:52:52,220 INFO >> >> org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application: >> Application application_1421958559415_0001 transitioned from RUNNING to >> FINISHING_C >> ONTAINERS_WAIT2015-01-22 12:52:52,220 INFO >> >> org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container: >> Container container_1421958559415_0001_01_000002 transitioned from RUNNING >> to KILLING >> 2015-01-22 12:52:52,220 INFO >> >> org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container: >> Container container_1421958559415_0001_01_000001 transitioned from RUNNING >> to KILLING >> 2015-01-22 12:52:52,220 INFO >> >> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch: >> Cleaning up container container_1421958559415_0001_01_000002 >> 2015-01-22 12:52:52,254 INFO >> >> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch: >> Cleaning up container container_1421958559415_0001_01_000001 >> 2015-01-22 12:52:52,508 WARN >> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit >> code from container container_1421958559415_0001_01_000002 is : 137 >> >> When we implement samza-standalone, we will probably have to follow a very >> similar procedure. If we detect a network split, we'll retry for a little >> while, and then kill all containers to avoid having duplicates. I am >> willing to bet that Mesos slaves follow exactly the same behavior when >> they >> can't contact the master. >> >> What I'm getting at here is that I think that this is pretty unavoidable. >> The best you can do is wait a little while, and then kill the duplicate >> (orphaned) containers. >> >> Cheers, >> Chris >> >> On Thu, Jan 22, 2015 at 12:42 PM, Chris Riccomini <[email protected]> >> wrote: >> >> > Hey Jae, >> > >> > Every resource manager has to solve the split-brain/orphaned container >> > problem. There are several issues to check: >> > >> > 1. Simulate a network partition between the master (RM in YARN) and >> slave >> > (NM in YARN). >> > 2. `kill -9` the slave (NM in YARN). >> > >> > In YARN's case, I know for sure that (2) will result in the containers >> > being leaked. The PPID on the container will be switched to 1. This is >> just >> > how UNIX works. I suspect `kill -9`'ing the slave in Mesos will result >> in >> > the same behavior. >> > >> > For (1), every distributed system has to solve this. How do you detect a >> > real partition (vs. a long GC, for example), and when you do detect a >> > partition, how do you react to it. >> > >> > I am testing (1) for YARN right now (using hello-samza, and killing the >> > RM). I will let you know how it behaves shortly. I believe it retries to >> > connect to the RM for some period of time, and then the NM kills itself >> if >> > it can't. If this is the case, then the container *would not be >> orphaned*. >> > I also believe the retry count and wait time is tunable, so you can >> define >> > your own exposure (e.g. you have a duplicate container for 1 minute, >> before >> > the NM shuts itself down). >> > >> > Anecdotally, we've not seen leaked containers in YARN since we began >> > properly shutting down NMs (not kill -9'ing them). >> > >> > > Depending on the time line among stabilizing stand alone and Mesos >> > support >> > >> > Regarding stabilizing standalone, I'm working on the design doc right >> now. >> > A proposed sketch of a ZK-based implementation was posted on SAMZA-516 >> > yesterday. My goal is to get the design doc done by tomorrow. This would >> > let us discuss and open subtasks next week, and start coding thereafter. >> > Realistically, I think standalone can be committed before end of Q1, and >> > should be usable. After a month or two of operation, I'd wager it'll be >> > relatively stable. So, that puts things at mid-Q2. >> > >> > Cheers, >> > Chris >> > >> > On Thu, Jan 22, 2015 at 12:24 PM, Bae, Jae Hyeon <[email protected]> >> > wrote: >> > >> >> I read through SAMZA-375. We will do one more round PoC Samza on Mesos. >> >> >> >> On Thu, Jan 22, 2015 at 11:14 AM, Bae, Jae Hyeon <[email protected]> >> >> wrote: >> >> >> >> > I asked Mantis guy about orphaned container in Mesos and he was >> almost >> >> > sure that Mesos won't let that happen. >> >> > >> >> > How is https://issues.apache.org/jira/browse/SAMZA-375 going? >> Depending >> >> > on the time line among stabilizing stand alone and Mesos support, our >> >> > schedule or decision will be changed. >> >> > >> >> > Thank you >> >> > Best, Jae >> >> > >> >> > On Wed, Jan 21, 2015 at 4:58 PM, Chris Riccomini < >> >> > [email protected]> wrote: >> >> > >> >> >> Hey all, >> >> >> >> >> >> Also, just opened this ticket to track work on samza-standalone: >> >> >> >> >> >> https://issues.apache.org/jira/browse/SAMZA-516 >> >> >> >> >> >> Cheers, >> >> >> Chris >> >> >> >> >> >> On 1/21/15 1:32 PM, "Chris Riccomini" <[email protected]> >> wrote: >> >> >> >> >> >> >Hey Jae, >> >> >> > >> >> >> >> So, we need to find out running Samza on Mesos won't create that >> >> >> >>problem, or Spark Streaming won't have that issue. In the worst >> case, >> >> >> >>creating our own distribution coordination might be more >> predictable >> >> >> >>instead of running Yarn on EMR. >> >> >> > >> >> >> >I think that there are two ways to fix this. One is to have the >> Kafka >> >> >> >broker detect that there are two producers that are "the same", and >> >> start >> >> >> >dropping messages from the "old one" (and perhaps throw an >> exception >> >> to >> >> >> >the old producer). The other way is to have the Samza container >> detect >> >> >> the >> >> >> >problem, and kill itself. >> >> >> > >> >> >> >The kafka-based approach is a subset of the transactionality >> feature >> >> >> >described here: >> >> >> > >> >> >> > >> >> >> > >> >> >> >> >> >> https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+ >> >> >> >i >> >> >> >n+Kafka >> >> >> > >> >> >> >The problem with the Kafka approach is that 1) it's kafka-specific, >> >> and >> >> >> 2) >> >> >> >the generation id required to drop messages from an orphaned >> producer >> >> >> >hasn't been implemented, except in a branch that's not been >> committed. >> >> >> > >> >> >> >So, if we accept that we shouldn't use Kafka as the solution for >> >> >> detecting >> >> >> >orphaned containers, the solution will have to go into Samza. >> Within >> >> >> >Samza, there are two approaches. One is to use the resource >> scheduler >> >> >> >(YARN, Mesos, etc.) to detect the problem. The other solution is to >> >> use >> >> >> >Samza, itself, to detect the problem. >> >> >> > >> >> >> >A YARN-specific example of how to solve the problem would be to >> have >> >> the >> >> >> >SamzaContainer periodically poll its local NM's REST endpoint: >> >> >> > >> >> >> > http://eat1-app1218.corp.linkedin.com:8042/ws/v1/node/info >> >> >> > >> >> >> >To see what the status is, its last update time, etc. If the REST >> >> >> endpoint >> >> >> >can't be reached, the node is unhealthy, or the last update time >> is > >> >> >> some >> >> >> >time interval, the container could kill itself. Again, this is >> >> >> >YARN-specific. >> >> >> > >> >> >> >I am not sure how Mesos handles split-brain. I've asked Tim Chen on >> >> >> >SAMZA-375: >> >> >> > >> >> >> > >> >> >> > >> >> >> >> >> >> https://issues.apache.org/jira/browse/SAMZA-375?focusedCommentId=14286204& >> >> >> >p >> >> >> >> >> >> >> >> >> >age=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comme >> >> >> >n >> >> >> >t-14286204 >> >> >> > >> >> >> >The last solution that I mentioned, using Samza directly (no >> >> dependency >> >> >> on >> >> >> >Kafka, YARN, Mesos, etc), seems like the best long-term solution to >> >> me. >> >> >> We >> >> >> >can either 1) introduce a heartbeat message into the coordinator >> >> stream, >> >> >> >or 2) use the existing checkpoint message as a heartbeat. There is >> >> some >> >> >> >complexity to this solution that would need to be thought through, >> >> >> though. >> >> >> >For example, should the heartbeat messages be sent from the main >> >> thread? >> >> >> >What happens if the main thread is blocked on process() for an >> >> extended >> >> >> >period of time? >> >> >> > >> >> >> >What do others think? As a short-term fix, it seems to me like >> >> YARN/Mesos >> >> >> >should handle this automatically for us. Has anyone had experience >> >> with >> >> >> >orphaned containers in Mesos? >> >> >> > >> >> >> >> I really appreciate if you give me some guideline about >> implementing >> >> >> >>custom cluster management interface of Samza. >> >> >> > >> >> >> >Samza jobs are started through bin/run-job.sh (inside samza-shell). >> >> This >> >> >> >CLI uses JobRunner to instantiate a StreamJobFactory (defined with >> >> >> >job.factory.class), which returns a StreamJob. To implement your >> own >> >> >> >cluster management, the first thing you'll need to do is implement >> >> >> >StreamJobFactory and StreamJob. You can have a look at YarnJob or >> >> >> >ProcessJob/ProcessJobFactory for an example of how to do this. >> >> >> > >> >> >> >Note that this code has changed slightly between 0.8.0 and master >> >> >> (0.9.0). >> >> >> >In 0.9.0, the partition-to-container assignment logic has been >> pulled >> >> out >> >> >> >of YARN's AM, and into a JobCoordinator class. >> >> >> > >> >> >> >The trick with adding EC2 ASG is going to be in handling partition >> >> >> >shifting when a new node is added to the group. For example, if you >> >> have >> >> >> >two machines, each running one container, and you add a third >> machine, >> >> >> >some of the input partitions (and corresponding StreamTasks) need >> to >> >> be >> >> >> >shifted from the two machines on to the third. The only way to do >> this >> >> >> >right now is to: >> >> >> > >> >> >> >1. Stop all containers. >> >> >> >2. Re-instantiate the JobCoordinator with a new container count. >> >> >> >3. Start new containers on all three machines with the new >> partition >> >> >> >assignments. >> >> >> > >> >> >> >In an ideal world, steps (1-3) would be handled automatically by >> >> Samza, >> >> >> >and wouldn't require container restarts. This is precisely what >> >> >> >samza-standalone will accomplish. If you're interested in >> >> contributing to >> >> >> >samza-standalone, that would be awesome. I'm working on a design >> doc >> >> >> right >> >> >> >now, which I'm trying to post by EOW. Once that's done, we can >> >> >> collaborate >> >> >> >on design and split the code up, if you'd like. >> >> >> > >> >> >> > >> >> >> >Cheers, >> >> >> >Chris >> >> >> > >> >> >> >On 1/21/15 1:14 PM, "Bae, Jae Hyeon" <[email protected]> wrote: >> >> >> > >> >> >> >>Hi Samza Devs >> >> >> >> >> >> >> >>The significant concern I got recently is, container leak. The >> data >> >> >> >>pipeline based on Samza can guarantee at least once delivery but >> the >> >> >> >>duplicate rate is over 1.0%, I am having alerts right now. >> Container >> >> >> >>leaks >> >> >> >>will push a lot of alerts to me. >> >> >> >> >> >> >> >>So, we need to find out running Samza on Mesos won't create that >> >> >> problem, >> >> >> >>or Spark Streaming won't have that issue. In the worst case, >> creating >> >> >> our >> >> >> >>own distribution coordination might be more predictable instead of >> >> >> >>running >> >> >> >>Yarn on EMR. >> >> >> >> >> >> >> >>What about standalone Samza? If this is quite plausible and the >> best >> >> >> >>solution in the near future, I want to be able to contribute. >> Could >> >> you >> >> >> >>share your thoughts or plans? >> >> >> >> >> >> >> >>I really appreciate if you give me some guideline about >> implementing >> >> >> >>custom >> >> >> >>cluster management interface of Samza. If it's possible, I want to >> >> take >> >> >> a >> >> >> >>look to replace Yarn support with EC2 ASG stuff. >> >> >> >> >> >> >> >>Thank you >> >> >> >>Best, Jae >> >> >> > >> >> >> >> >> >> >> >> > >> >> >> > >> > >> > >
