Hey Roger,

> Do you really need to do those things if Samza is run outside a cluster
>manager?  I'm thinking that if you want container failure detection and
>auto-shifting, you should use a cluster manager.  For "simple"
>environments, you might be ok with static partition assignment and
>relying on monitoring to detect failure and manual intervention to shift
>or restart.

I am still debating this. I tend to agree with you. The simple thing to do
is just have static partition assignment, as SAMZA-41 proposes. This will
certainly get done. This solution will probably be satisfactory for some,
but not for others.

For those that want failure detection/failover, we could just tell them to
use YARN/Mesos, or we could provide some simple out of the box solution.
Maybe integration with Helix (http://helix.apache.org/) would be a good
compromise. We might be able to keep this code totally separate from the
rest of Samza, so that it doesn't impact complexity. In any case, it's
worth investigating, even if we opt not to do it.

Cheers,
Chris

On 1/5/15 11:09 PM, "Roger Hoover" <[email protected]> wrote:

>"Several different parties have asked to be able to run
>Samza as a standalone service without YARN, or any other scheduler. If we
>do this, we would need a way to (at least):
>
>  1) Detect when a container has failed.
>  2) Shift partitions from one container to another."
>
>I'm also interested in running Samza without YARN and would be happy with
>static partition assignment.  Do you really need to do those things if
>Samza is run outside a cluster manager?  I'm thinking that if you want
>container failure detection and auto-shifting, you should use a cluster
>manager.  For "simple" environments, you might be ok with static partition
>assignment and relying on monitoring to detect failure and manual
>intervention to shift or restart.
>
>On Mon, Jan 5, 2015 at 10:48 AM, Chris Riccomini <
>[email protected]> wrote:
>
>> Hey all,
>>
>> There are two different failure scenarios being discussed here.
>>
>> ## NM failure
>>
>> Yi Su is talking about the situation where a NM fails (e.g. The host is
>> lost, there is a network partition, etc). In this case, yes, it will
>>take
>> 10 minutes, by default, for the RM to mark the NM as dead. This setting
>> can be tuned with:
>>
>>   yarn.nm.liveness-monitor.expiry-interval-ms
>>
>> In yarn-site.xml. There is also an equivalent setting for AMs, but I
>>have
>> not observed Samza's AMs hanging--they typically either work properly,
>>or
>> fail catastrophically (in which case the NM notifies the RM of the
>> failure). Dropping the expiry time should not have a negative impact on
>> YARN grid performance. The NMs, by default heartbeat once every second:
>>
>>   yarn.resourcemanager.nodemanagers.heartbeat-interval-ms
>>
>>
>> So, dropping the timeout to, say 30s, seems reasonable. Barring large
>>30s
>> GCs, this should be a relatively safe setting. Going much lower than
>>this
>> will likely exhibit flagging (as Yi observed when setting it to 1
>>second).
>>
>> ## Container failure
>>
>> The second case, which Yan is talking about, is when a failure occurs in
>> the container, but the NM is still alive. In this case, the notification
>> is immediate, and there is no 10 second penalty.
>>
>> The second issue that Yi raises, pre-allocating reserve containers,
>>would
>> eliminate latency when starting new containers, but the observed latency
>> is small: a few seconds, or less. Samza's goal is not to be a
>> high-frequency trading sub-millisecond stream processing system. Kafka,
>> itself, isn't even good for this. We allow for a minute or two of
>>downtime
>> during certain failure scenarios (to recover state, for instance). The
>> philosophy is that Samza's use cases are all async, and so should be
>>able
>> to sustain a brief downtime during failover. If this is not desirable,
>> synchronous RPC is probably a better solution for you.
>>
>> ## Container failure detection
>>
>> It seems possible that Samza will end up with failure detection code for
>> its containers. Several different parties have asked to be able to run
>> Samza as a standalone service without YARN, or any other scheduler. If
>>we
>> do this, we would need a way to (at least):
>>
>>   1) Detect when a container has failed.
>>   2) Shift partitions from one container to another.
>>
>> I haven't thought about the best way to do this in great detail. The
>>work
>> would likely fall out of SAMZA-41, as a follow-on ticket. To support
>>this,
>> it seems possible that SAMZA-348 (the coordinator stream) could be used
>>to
>> handle heartbeats and coordination. I'm not sure when this will get
>>done,
>> though. If it falls on me, I will scope it out in February.
>>
>> Cheers,
>> Chris
>>
>> On 1/4/15 7:09 AM, "Milinda Pathirage" <[email protected]> wrote:
>>
>> >I think what Su has experienced is true in case of Node manager
>>failure.
>> >This was there in old Hadoop (Task Tracker failures), this [1] paper
>> >discuss effects of this. I think this behavior is there for node
>>manager
>> >failures (In YARN) too, thats what I discovered sometime back (about a
>> >year
>> >ago) by going through YARN code. But I am not sure whether this is true
>> >now.
>> >
>> >Thanks
>> >Milinda
>> >
>> >[1] http://www.cs.rice.edu/~fd2/pdf/hpdc106-dinu.pdf
>> >
>> >On Sat, Jan 3, 2015 at 11:24 PM, Yi Su <[email protected]> wrote:
>> >
>> >> Hi Fang,
>> >>
>> >> I have verified the failure detection issue. It takes 10mins for
>> >>recovery,
>> >> if I kill the Node Manager process first. I will detail the
>>experiment,
>> >>in
>> >> case I have make any mistakes.
>> >>
>> >> The nodes arrangement is same as before.
>> >>
>> >> Workload :
>> >>         Every second, a python program generates a record with the
>> >>system
>> >> current time. And it sends the record 10 times to kafka topic
>> >> "suyi-test-input".
>> >>
>> >> Stream Task :
>> >>         It gets the tuples from input stream and sends the tuples to
>>the
>> >> output stream "suyi-test-output". Checkpiont is disabled.
>> >>
>> >> YARN Configuration :
>> >>         <property>
>> >>                 <description>How long to wait until a node manager is
>> >> considered dead.</description>
>> >>              
>><name>yarn.nm.liveness-monitor.expiry-interval-ms</name>
>> >>                 <value>600000</value>
>> >>         </property>
>> >>         <property>
>> >>                 <description>How often to check that node managers
>>are
>> >> still alive.</description>
>> >>                 <name>yarn.resourcemanager.nm.
>> >> liveness-monitor.interval-ms</name>
>> >>                 <value>1000</value>
>> >>         </property>
>> >>
>> >> Experiment Process :
>> >>         (1) I submited the stream task, the application master ran on
>> >>node
>> >> "a", and the other container ran on node "b".
>> >>         (2) I killed the node manager process on node "b".
>> >>         (3) At about 09:59:07, I killed the container process on node
>> >>"b".
>> >>
>> >> Results :
>> >>         (1) At 10:09:33 the application master tried to redeploy the
>> >>lost
>> >> container, according to application master logs.
>> >>         (2) The output during 09:59:35 to 10:09:53 is lost.
>> >>
>> >> A fraction from application master logs:
>> >>         2015-01-04 09:47:20 ContainerManagementProtocolProxy [INFO]
>> >> Opening proxy : b:35889
>> >>         2015-01-04 09:47:20 SamzaAppMasterTaskManager [INFO] Claimed
>> >>task
>> >> ID 0 for container container_1420335573203_0001_01_000002 on node b (
>> >>
>> 
>>>>http://b:8042/node/containerlogs/container_1420335573203_0001_01_000002
>> ).
>> >>         2015-01-04 09:47:20 SamzaAppMasterTaskManager [INFO] Started
>> >>task
>> >> ID 0
>> >>         2015-01-04 09:57:19 ClientUtils$ [INFO] Fetching metadata
>>from
>> >> broker id:0,host:192.168.3.141,port:9092 with correlation id 41 for 1
>> >> topic(s) Set(metrics)
>> >>         2015-01-04 09:57:19 SyncProducer [INFO] Connected to
>> >> 192.168.3.141:9092 for producing
>> >>         2015-01-04 09:57:19 SyncProducer [INFO] Disconnecting from
>> >> 192.168.3.141:9092
>> >>         2015-01-04 09:57:19 SyncProducer [INFO] Disconnecting from
>> >>a:9092
>> >>         2015-01-04 09:57:19 SyncProducer [INFO] Connected to a:9092
>>for
>> >> producing
>> >>         2015-01-04 10:07:19 ClientUtils$ [INFO] Fetching metadata
>>from
>> >> broker id:0,host:192.168.3.141,port:9092 with correlation id 82 for 1
>> >> topic(s) Set(metrics)
>> >>         2015-01-04 10:07:19 SyncProducer [INFO] Connected to
>> >> 192.168.3.141:9092 for producing
>> >>         2015-01-04 10:07:19 SyncProducer [INFO] Disconnecting from
>> >> 192.168.3.141:9092
>> >>         2015-01-04 10:07:19 SyncProducer [INFO] Disconnecting from
>> >>a:9092
>> >>         2015-01-04 10:07:19 SyncProducer [INFO] Connected to a:9092
>>for
>> >> producing
>> >>         2015-01-04 10:09:33 SamzaAppMasterTaskManager [INFO] Got an
>>exit
>> >> code of -100. This means that container
>> >>container_1420335573203_0001_01_000002
>> >> was killed by YARN, either due to being released by the application
>> >>master
>> >> or being 'lost' due to node failures etc.
>> >>         2015-01-04 10:09:33 SamzaAppMasterTaskManager [INFO] Released
>> >> container container_1420335573203_0001_01_000002 was assigned task
>>ID 0.
>> >> Requesting a new container for the task.
>> >>         2015-01-04 10:09:33 SamzaAppMasterTaskManager [INFO]
>>Requesting
>> >>1
>> >> container(s) with 1024mb of memory
>> >>
>> >> A fraction from output with my comment added:
>> >>         {"time":"1420336774.0"}
>> >>         {"time":"1420336774.0"}
>> >>         {"time":"1420336774.0"}
>> >>         {"time":"1420336775.0"}
>> >>         {"time":"1420336775.0"}
>> >>         {"time":"1420336775.0"}
>> >>         {"time":"1420336775.0"}
>> >>         {"time":"1420336775.0"}
>> >>         {"time":"1420336775.0"}
>> >>         {"time":"1420336775.0"}
>> >>         {"time":"1420336775.0"}
>> >>         {"time":"1420336775.0"}
>> >>         {"time":"1420336775.0"} \\ 09:59:35
>> >>         {"time":"1420337393.0"} \\ 10:09:53
>> >>         {"time":"1420337393.0"}
>> >>         {"time":"1420337393.0"}
>> >>         {"time":"1420337393.0"}
>> >>         {"time":"1420337393.0"}
>> >>         {"time":"1420337393.0"}
>> >>         {"time":"1420337393.0"}
>> >>         {"time":"1420337393.0"}
>> >>         {"time":"1420337393.0"}
>> >>         {"time":"1420337393.0"}
>> >>         {"time":"1420337394.0"}
>> >>         {"time":"1420337394.0"}
>> >>         {"time":"1420337394.0"}
>> >>         {"time":"1420337394.0"}
>> >>         {"time":"1420337394.0"}
>> >>
>> >> For the Task redeployment issue, I worries that if the Resource
>>Manager
>> >>is
>> >> busy or there are no available containers in the system,
>>redeployment of
>> >> failure task might be delayed.
>> >>
>> >> Thank you for your help.
>> >>
>> >> Su Yi
>> >>
>> >>
>> >> On Sat, 03 Jan 2015 06:04:20 +0800, Yan Fang <[email protected]>
>> >>wrote:
>> >>
>> >>  Hi Su Yi,
>> >>>
>> >>> I think there maybe a misunderstanding. For the failure detection,
>>if
>> >>>the
>> >>> containers die ( because of NM failure or whatever reason ), AM will
>> >>>bring
>> >>> up new containers in the same NM or a different NM according to the
>> >>> resource availability. It does not take as much as 10 mins to
>>recover.
>> >>>One
>> >>> way you can test is that, you run a Samza job and manually kill the
>>NM
>> >>>or
>> >>> the thread to see how quickly it recovers. In terms of how
>> >>> yarn.nm.liveness-monitor.expiry-interval-ms
>> >>> plays the role here, not very sure. Hope any yarn expert in the
>> >>>community
>> >>> can explain it a little.
>> >>>
>> >>> The goal of standby container in SAMZA-406 is to recover quickly
>>when
>> >>>the
>> >>> task has a lot of local state and so reading changelog takes a long
>> >>>time,
>> >>> not to reduce the time of *allocating* the container, which, I
>> >>>believe, is
>> >>> taken care by the YARN.
>> >>>
>> >>> Hope this help a little. Thanks.
>> >>>
>> >>> Cheers,
>> >>>
>> >>> Fang, Yan
>> >>> [email protected]
>> >>> +1 (206) 849-4108
>> >>>
>> >>> On Thu, Jan 1, 2015 at 4:20 AM, Su Yi <[email protected]> wrote:
>> >>>
>> >>>  Hi Timothy,
>> >>>>
>> >>>> There are 4 nodes in total : a,b,c,d
>> >>>> Resource manager : a
>> >>>> Node manager : a,b,c,d
>> >>>> Kafka and zookeeper running on : a
>> >>>>
>> >>>> YARN configuration is :
>> >>>>
>> >>>> <property>
>> >>>>     <description>How long to wait until a node manager is
>>considered
>> >>>> dead.</description>
>> >>>>     <name>yarn.nm.liveness-monitor.expiry-interval-ms</name>
>> >>>>     <value>1000</value>
>> >>>> </property>
>> >>>>
>> >>>> <property>
>> >>>>     <description>How often to check that node managers are still
>> >>>> alive.</description>
>> >>>>     
>><name>yarn.resourcemanager.nm.liveness-monitor.interval-ms</name>
>> >>>>     <value>100</value>
>> >>>> </property>
>> >>>>
>> >>>> From web UI of Samza, I found that node 'a' appeared and
>>disappeared
>> >>>> again
>> >>>> and again in the node list.
>> >>>>
>> >>>> Su Yi
>> >>>>
>> >>>> On 2015-01-01 02:54:48,"Timothy Chen" <[email protected]> wrote:
>> >>>>
>> >>>> >Hi Su Yi,
>> >>>> >
>> >>>> >Can you elaborate a bit more what you mean by unstable cluster
>>when
>> >>>> >you configured the heartbeat interval to be 1s?
>> >>>> >
>> >>>> >Tim
>> >>>> >
>> >>>> >On Wed, Dec 31, 2014 at 10:30 AM, Su Yi <[email protected]> wrote:
>> >>>> >> Hello,
>> >>>> >>
>> >>>> >> Here are some thoughts about HA of Samza.
>> >>>> >>
>> >>>> >> 1. Failure detection
>> >>>> >>
>> >>>> >> The problem is, failure detection of container completely
>>depends
>> >>>>on
>> >>>> YARN in Samza. YARN counts on Node Manager reporting container
>> >>>>failures,
>> >>>> however Node Manager could fail, too (like, if the machine failed,
>>NM
>> >>>> would
>> >>>> fail). Node Manager failures can be detected through heartbeat by
>> >>>> Resource
>> >>>> Manager, but, by default it'll take 10 mins to confirm Node Manager
>> >>>> failure. I think, that's OK with batch processing, but not stream
>> >>>> processing.
>> >>>> >>
>> >>>> >> Configuring yarn failure confirm interval to 1s, result in an
>> >>>>unstable
>> >>>> yarn cluster(4 node in total). With 2s, all things works fine, but
>>it
>> >>>> takes
>> >>>> 10s~20s to get lost container(machine shut down) back. Considering
>> >>>>that
>> >>>> testing stream task is very simple(stateless), the recovery time is
>> >>>> relatively long.
>> >>>> >>
>> >>>> >> I am not an expert on YARN, I don't know why it, by default,
>>takes
>> >>>> such
>> >>>> a long time to confirm node failure. To my understanding, YARN is
>> >>>> something
>> >>>> trying to be general, and it is not sufficient for stream
>>processing
>> >>>> framework. Extra effort should be done beyond YARN on failure
>> >>>>detection
>> >>>> in
>> >>>> stream processing.
>> >>>> >>
>> >>>> >> 2. Task redeployment
>> >>>> >>
>> >>>> >> After Resource Manager informed Samza of container failure,
>>Samza
>> >>>> should apply for resources from YARN to redeploy failed tasks,
>>which
>> >>>> consumes time during recovery. And, recovery time is critical for
>>HA
>> >>>>in
>> >>>> stream processing. I think, maintaining a few standby containers
>>may
>> >>>> eliminate this overhead on recovery time. Samza could deploy failed
>> >>>>tasks
>> >>>> on the standby containers than requesting from YARN.
>> >>>> >>
>> >>>> >> Hot standby containers, which is described in SAMZA-406(
>> >>>> https://issues.apache.org/jira/browse/SAMZA-406), may help save
>> >>>>recovery
>> >>>> time, however it's costly(it doubles the resources needed).
>> >>>> >>
>> >>>> >> I'm wondering, what does these stuffs means to you, and how
>>about
>> >>>>the
>> >>>> feasibility. By the way, I'm using Samza 0.7 .
>> >>>> >>
>> >>>> >> Thank you for reading.
>> >>>> >>
>> >>>> >> Happy New Year!;-)
>> >>>> >>
>> >>>> >> Su Yi
>> >>>>
>> >>>
>> >>
>> >
>> >
>> >--
>> >Milinda Pathirage
>> >
>> >PhD Student | Research Assistant
>> >School of Informatics and Computing | Data to Insight Center
>> >Indiana University
>> >
>> >twitter: milindalakmal
>> >skype: milinda.pathirage
>> >blog: http://milinda.pathirage.org
>>
>>

Reply via email to