"Several different parties have asked to be able to run Samza as a standalone service without YARN, or any other scheduler. If we do this, we would need a way to (at least):
1) Detect when a container has failed. 2) Shift partitions from one container to another." I'm also interested in running Samza without YARN and would be happy with static partition assignment. Do you really need to do those things if Samza is run outside a cluster manager? I'm thinking that if you want container failure detection and auto-shifting, you should use a cluster manager. For "simple" environments, you might be ok with static partition assignment and relying on monitoring to detect failure and manual intervention to shift or restart. On Mon, Jan 5, 2015 at 10:48 AM, Chris Riccomini < [email protected]> wrote: > Hey all, > > There are two different failure scenarios being discussed here. > > ## NM failure > > Yi Su is talking about the situation where a NM fails (e.g. The host is > lost, there is a network partition, etc). In this case, yes, it will take > 10 minutes, by default, for the RM to mark the NM as dead. This setting > can be tuned with: > > yarn.nm.liveness-monitor.expiry-interval-ms > > In yarn-site.xml. There is also an equivalent setting for AMs, but I have > not observed Samza's AMs hanging--they typically either work properly, or > fail catastrophically (in which case the NM notifies the RM of the > failure). Dropping the expiry time should not have a negative impact on > YARN grid performance. The NMs, by default heartbeat once every second: > > yarn.resourcemanager.nodemanagers.heartbeat-interval-ms > > > So, dropping the timeout to, say 30s, seems reasonable. Barring large 30s > GCs, this should be a relatively safe setting. Going much lower than this > will likely exhibit flagging (as Yi observed when setting it to 1 second). > > ## Container failure > > The second case, which Yan is talking about, is when a failure occurs in > the container, but the NM is still alive. In this case, the notification > is immediate, and there is no 10 second penalty. > > The second issue that Yi raises, pre-allocating reserve containers, would > eliminate latency when starting new containers, but the observed latency > is small: a few seconds, or less. Samza's goal is not to be a > high-frequency trading sub-millisecond stream processing system. Kafka, > itself, isn't even good for this. We allow for a minute or two of downtime > during certain failure scenarios (to recover state, for instance). The > philosophy is that Samza's use cases are all async, and so should be able > to sustain a brief downtime during failover. If this is not desirable, > synchronous RPC is probably a better solution for you. > > ## Container failure detection > > It seems possible that Samza will end up with failure detection code for > its containers. Several different parties have asked to be able to run > Samza as a standalone service without YARN, or any other scheduler. If we > do this, we would need a way to (at least): > > 1) Detect when a container has failed. > 2) Shift partitions from one container to another. > > I haven't thought about the best way to do this in great detail. The work > would likely fall out of SAMZA-41, as a follow-on ticket. To support this, > it seems possible that SAMZA-348 (the coordinator stream) could be used to > handle heartbeats and coordination. I'm not sure when this will get done, > though. If it falls on me, I will scope it out in February. > > Cheers, > Chris > > On 1/4/15 7:09 AM, "Milinda Pathirage" <[email protected]> wrote: > > >I think what Su has experienced is true in case of Node manager failure. > >This was there in old Hadoop (Task Tracker failures), this [1] paper > >discuss effects of this. I think this behavior is there for node manager > >failures (In YARN) too, thats what I discovered sometime back (about a > >year > >ago) by going through YARN code. But I am not sure whether this is true > >now. > > > >Thanks > >Milinda > > > >[1] http://www.cs.rice.edu/~fd2/pdf/hpdc106-dinu.pdf > > > >On Sat, Jan 3, 2015 at 11:24 PM, Yi Su <[email protected]> wrote: > > > >> Hi Fang, > >> > >> I have verified the failure detection issue. It takes 10mins for > >>recovery, > >> if I kill the Node Manager process first. I will detail the experiment, > >>in > >> case I have make any mistakes. > >> > >> The nodes arrangement is same as before. > >> > >> Workload : > >> Every second, a python program generates a record with the > >>system > >> current time. And it sends the record 10 times to kafka topic > >> "suyi-test-input". > >> > >> Stream Task : > >> It gets the tuples from input stream and sends the tuples to the > >> output stream "suyi-test-output". Checkpiont is disabled. > >> > >> YARN Configuration : > >> <property> > >> <description>How long to wait until a node manager is > >> considered dead.</description> > >> <name>yarn.nm.liveness-monitor.expiry-interval-ms</name> > >> <value>600000</value> > >> </property> > >> <property> > >> <description>How often to check that node managers are > >> still alive.</description> > >> <name>yarn.resourcemanager.nm. > >> liveness-monitor.interval-ms</name> > >> <value>1000</value> > >> </property> > >> > >> Experiment Process : > >> (1) I submited the stream task, the application master ran on > >>node > >> "a", and the other container ran on node "b". > >> (2) I killed the node manager process on node "b". > >> (3) At about 09:59:07, I killed the container process on node > >>"b". > >> > >> Results : > >> (1) At 10:09:33 the application master tried to redeploy the > >>lost > >> container, according to application master logs. > >> (2) The output during 09:59:35 to 10:09:53 is lost. > >> > >> A fraction from application master logs: > >> 2015-01-04 09:47:20 ContainerManagementProtocolProxy [INFO] > >> Opening proxy : b:35889 > >> 2015-01-04 09:47:20 SamzaAppMasterTaskManager [INFO] Claimed > >>task > >> ID 0 for container container_1420335573203_0001_01_000002 on node b ( > >> > >>http://b:8042/node/containerlogs/container_1420335573203_0001_01_000002 > ). > >> 2015-01-04 09:47:20 SamzaAppMasterTaskManager [INFO] Started > >>task > >> ID 0 > >> 2015-01-04 09:57:19 ClientUtils$ [INFO] Fetching metadata from > >> broker id:0,host:192.168.3.141,port:9092 with correlation id 41 for 1 > >> topic(s) Set(metrics) > >> 2015-01-04 09:57:19 SyncProducer [INFO] Connected to > >> 192.168.3.141:9092 for producing > >> 2015-01-04 09:57:19 SyncProducer [INFO] Disconnecting from > >> 192.168.3.141:9092 > >> 2015-01-04 09:57:19 SyncProducer [INFO] Disconnecting from > >>a:9092 > >> 2015-01-04 09:57:19 SyncProducer [INFO] Connected to a:9092 for > >> producing > >> 2015-01-04 10:07:19 ClientUtils$ [INFO] Fetching metadata from > >> broker id:0,host:192.168.3.141,port:9092 with correlation id 82 for 1 > >> topic(s) Set(metrics) > >> 2015-01-04 10:07:19 SyncProducer [INFO] Connected to > >> 192.168.3.141:9092 for producing > >> 2015-01-04 10:07:19 SyncProducer [INFO] Disconnecting from > >> 192.168.3.141:9092 > >> 2015-01-04 10:07:19 SyncProducer [INFO] Disconnecting from > >>a:9092 > >> 2015-01-04 10:07:19 SyncProducer [INFO] Connected to a:9092 for > >> producing > >> 2015-01-04 10:09:33 SamzaAppMasterTaskManager [INFO] Got an exit > >> code of -100. This means that container > >>container_1420335573203_0001_01_000002 > >> was killed by YARN, either due to being released by the application > >>master > >> or being 'lost' due to node failures etc. > >> 2015-01-04 10:09:33 SamzaAppMasterTaskManager [INFO] Released > >> container container_1420335573203_0001_01_000002 was assigned task ID 0. > >> Requesting a new container for the task. > >> 2015-01-04 10:09:33 SamzaAppMasterTaskManager [INFO] Requesting > >>1 > >> container(s) with 1024mb of memory > >> > >> A fraction from output with my comment added: > >> {"time":"1420336774.0"} > >> {"time":"1420336774.0"} > >> {"time":"1420336774.0"} > >> {"time":"1420336775.0"} > >> {"time":"1420336775.0"} > >> {"time":"1420336775.0"} > >> {"time":"1420336775.0"} > >> {"time":"1420336775.0"} > >> {"time":"1420336775.0"} > >> {"time":"1420336775.0"} > >> {"time":"1420336775.0"} > >> {"time":"1420336775.0"} > >> {"time":"1420336775.0"} \\ 09:59:35 > >> {"time":"1420337393.0"} \\ 10:09:53 > >> {"time":"1420337393.0"} > >> {"time":"1420337393.0"} > >> {"time":"1420337393.0"} > >> {"time":"1420337393.0"} > >> {"time":"1420337393.0"} > >> {"time":"1420337393.0"} > >> {"time":"1420337393.0"} > >> {"time":"1420337393.0"} > >> {"time":"1420337393.0"} > >> {"time":"1420337394.0"} > >> {"time":"1420337394.0"} > >> {"time":"1420337394.0"} > >> {"time":"1420337394.0"} > >> {"time":"1420337394.0"} > >> > >> For the Task redeployment issue, I worries that if the Resource Manager > >>is > >> busy or there are no available containers in the system, redeployment of > >> failure task might be delayed. > >> > >> Thank you for your help. > >> > >> Su Yi > >> > >> > >> On Sat, 03 Jan 2015 06:04:20 +0800, Yan Fang <[email protected]> > >>wrote: > >> > >> Hi Su Yi, > >>> > >>> I think there maybe a misunderstanding. For the failure detection, if > >>>the > >>> containers die ( because of NM failure or whatever reason ), AM will > >>>bring > >>> up new containers in the same NM or a different NM according to the > >>> resource availability. It does not take as much as 10 mins to recover. > >>>One > >>> way you can test is that, you run a Samza job and manually kill the NM > >>>or > >>> the thread to see how quickly it recovers. In terms of how > >>> yarn.nm.liveness-monitor.expiry-interval-ms > >>> plays the role here, not very sure. Hope any yarn expert in the > >>>community > >>> can explain it a little. > >>> > >>> The goal of standby container in SAMZA-406 is to recover quickly when > >>>the > >>> task has a lot of local state and so reading changelog takes a long > >>>time, > >>> not to reduce the time of *allocating* the container, which, I > >>>believe, is > >>> taken care by the YARN. > >>> > >>> Hope this help a little. Thanks. > >>> > >>> Cheers, > >>> > >>> Fang, Yan > >>> [email protected] > >>> +1 (206) 849-4108 > >>> > >>> On Thu, Jan 1, 2015 at 4:20 AM, Su Yi <[email protected]> wrote: > >>> > >>> Hi Timothy, > >>>> > >>>> There are 4 nodes in total : a,b,c,d > >>>> Resource manager : a > >>>> Node manager : a,b,c,d > >>>> Kafka and zookeeper running on : a > >>>> > >>>> YARN configuration is : > >>>> > >>>> <property> > >>>> <description>How long to wait until a node manager is considered > >>>> dead.</description> > >>>> <name>yarn.nm.liveness-monitor.expiry-interval-ms</name> > >>>> <value>1000</value> > >>>> </property> > >>>> > >>>> <property> > >>>> <description>How often to check that node managers are still > >>>> alive.</description> > >>>> <name>yarn.resourcemanager.nm.liveness-monitor.interval-ms</name> > >>>> <value>100</value> > >>>> </property> > >>>> > >>>> From web UI of Samza, I found that node 'a' appeared and disappeared > >>>> again > >>>> and again in the node list. > >>>> > >>>> Su Yi > >>>> > >>>> On 2015-01-01 02:54:48,"Timothy Chen" <[email protected]> wrote: > >>>> > >>>> >Hi Su Yi, > >>>> > > >>>> >Can you elaborate a bit more what you mean by unstable cluster when > >>>> >you configured the heartbeat interval to be 1s? > >>>> > > >>>> >Tim > >>>> > > >>>> >On Wed, Dec 31, 2014 at 10:30 AM, Su Yi <[email protected]> wrote: > >>>> >> Hello, > >>>> >> > >>>> >> Here are some thoughts about HA of Samza. > >>>> >> > >>>> >> 1. Failure detection > >>>> >> > >>>> >> The problem is, failure detection of container completely depends > >>>>on > >>>> YARN in Samza. YARN counts on Node Manager reporting container > >>>>failures, > >>>> however Node Manager could fail, too (like, if the machine failed, NM > >>>> would > >>>> fail). Node Manager failures can be detected through heartbeat by > >>>> Resource > >>>> Manager, but, by default it'll take 10 mins to confirm Node Manager > >>>> failure. I think, that's OK with batch processing, but not stream > >>>> processing. > >>>> >> > >>>> >> Configuring yarn failure confirm interval to 1s, result in an > >>>>unstable > >>>> yarn cluster(4 node in total). With 2s, all things works fine, but it > >>>> takes > >>>> 10s~20s to get lost container(machine shut down) back. Considering > >>>>that > >>>> testing stream task is very simple(stateless), the recovery time is > >>>> relatively long. > >>>> >> > >>>> >> I am not an expert on YARN, I don't know why it, by default, takes > >>>> such > >>>> a long time to confirm node failure. To my understanding, YARN is > >>>> something > >>>> trying to be general, and it is not sufficient for stream processing > >>>> framework. Extra effort should be done beyond YARN on failure > >>>>detection > >>>> in > >>>> stream processing. > >>>> >> > >>>> >> 2. Task redeployment > >>>> >> > >>>> >> After Resource Manager informed Samza of container failure, Samza > >>>> should apply for resources from YARN to redeploy failed tasks, which > >>>> consumes time during recovery. And, recovery time is critical for HA > >>>>in > >>>> stream processing. I think, maintaining a few standby containers may > >>>> eliminate this overhead on recovery time. Samza could deploy failed > >>>>tasks > >>>> on the standby containers than requesting from YARN. > >>>> >> > >>>> >> Hot standby containers, which is described in SAMZA-406( > >>>> https://issues.apache.org/jira/browse/SAMZA-406), may help save > >>>>recovery > >>>> time, however it's costly(it doubles the resources needed). > >>>> >> > >>>> >> I'm wondering, what does these stuffs means to you, and how about > >>>>the > >>>> feasibility. By the way, I'm using Samza 0.7 . > >>>> >> > >>>> >> Thank you for reading. > >>>> >> > >>>> >> Happy New Year!;-) > >>>> >> > >>>> >> Su Yi > >>>> > >>> > >> > > > > > >-- > >Milinda Pathirage > > > >PhD Student | Research Assistant > >School of Informatics and Computing | Data to Insight Center > >Indiana University > > > >twitter: milindalakmal > >skype: milinda.pathirage > >blog: http://milinda.pathirage.org > >
