Hey Jae,

Here are my results from testing failure scenario (1), above. I started
hello-samza, ran a job, and then killed the RM. The NM hung around for a
while, and then killed the orphaned containers, and itself:

NM trying to reconnect to dead (or partitioned) RM:

criccomi-mn:incubator-samza-hello-samza criccomi$ jps
1650 SamzaAppMaster
1350 Kafka
1687 SamzaContainer
1321 NodeManager
461
1902 Jps
1247 QuorumPeerMain

NM decides to kill all of its containers, and itself:

criccomi-mn:incubator-samza-hello-samza criccomi$ jps
1925 Jps
1350 Kafka
461
1247 QuorumPeerMain

Here are the logs from the NM after killing the RM:

2015-01-22 12:33:22,611 INFO org.apache.hadoop.ipc.Client: Retrying connect
to server: localhost/127.0.0.1:8031. Already tried 0 time(s); retry policy
is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
MILLISECONDS)
2015-01-22 12:33:23,612 INFO org.apache.hadoop.ipc.Client: Retrying connect
to server: localhost/127.0.0.1:8031. Already tried 1 time(s); retry policy
is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
MILLISECONDS)
2015-01-22 12:33:24,613 INFO org.apache.hadoop.ipc.Client: Retrying connect
to server: localhost/127.0.0.1:8031. Already tried 2 time(s); retry policy
is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
MILLISECONDS)
2015-01-22 12:33:25,615 INFO org.apache.hadoop.ipc.Client: Retrying connect
to server: localhost/127.0.0.1:8031. Already tried 3 time(s); retry policy
is RetryUpToMaximumCountWithFixedSleep(maxRetries=10, sleepTime=1000
MILLISECONDS)
....
2015-01-22 12:52:50,096 INFO org.apache.hadoop.ipc.Client: Retrying connect
to server: localhost/127.0.0.1:8031. Already tried 7 time(s); retry policy
is RetryUpToMaximumCountWithFixedSleep(maxRet
ries=10, sleepTime=1000 MILLISECONDS)
2015-01-22 12:52:51,097 INFO org.apache.hadoop.ipc.Client: Retrying connect
to server: localhost/127.0.0.1:8031. Already tried 8 time(s); retry policy
is RetryUpToMaximumCountWithFixedSleep(maxRet
ries=10, sleepTime=1000 MILLISECONDS)
2015-01-22 12:52:52,098 INFO org.apache.hadoop.ipc.Client: Retrying connect
to server: localhost/127.0.0.1:8031. Already tried 9 time(s); retry policy
is RetryUpToMaximumCountWithFixedSleep(maxRet
ries=10, sleepTime=1000 MILLISECONDS)

You can see that the NM ran for 20 minutes. I believe this is tunable with
configs in:


https://hadoop.apache.org/docs/r2.4.0/hadoop-yarn/hadoop-yarn-common/yarn-default.xml

Once the final timeout happens, the NM shuts all containers down, and kills
itself:

2015-01-22 12:52:52,217 INFO
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
Applications still running : [application_1421958559415_0001]
2015-01-22 12:52:52,219 INFO
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl:
Waiting for Applications to be Finished
2015-01-22 12:52:52,220 INFO
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application:
Application application_1421958559415_0001 transitioned from RUNNING to
FINISHING_C
ONTAINERS_WAIT2015-01-22 12:52:52,220 INFO
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container:
Container container_1421958559415_0001_01_000002 transitioned from RUNNING
to KILLING
2015-01-22 12:52:52,220 INFO
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container:
Container container_1421958559415_0001_01_000001 transitioned from RUNNING
to KILLING
2015-01-22 12:52:52,220 INFO
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch:
Cleaning up container container_1421958559415_0001_01_000002
2015-01-22 12:52:52,254 INFO
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch:
Cleaning up container container_1421958559415_0001_01_000001
2015-01-22 12:52:52,508 WARN
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit
code from container container_1421958559415_0001_01_000002 is : 137

When we implement samza-standalone, we will probably have to follow a very
similar procedure. If we detect a network split, we'll retry for a little
while, and then kill all containers to avoid having duplicates. I am
willing to bet that Mesos slaves follow exactly the same behavior when they
can't contact the master.

What I'm getting at here is that I think that this is pretty unavoidable.
The best you can do is wait a little while, and then kill the duplicate
(orphaned) containers.

Cheers,
Chris

On Thu, Jan 22, 2015 at 12:42 PM, Chris Riccomini <[email protected]>
wrote:

> Hey Jae,
>
> Every resource manager has to solve the split-brain/orphaned container
> problem. There are several issues to check:
>
> 1. Simulate a network partition between the master (RM in YARN) and slave
> (NM in YARN).
> 2. `kill -9` the slave (NM in YARN).
>
> In YARN's case, I know for sure that (2) will result in the containers
> being leaked. The PPID on the container will be switched to 1. This is just
> how UNIX works. I suspect `kill -9`'ing the slave in Mesos will result in
> the same behavior.
>
> For (1), every distributed system has to solve this. How do you detect a
> real partition (vs. a long GC, for example), and when you do detect a
> partition, how do you react to it.
>
> I am testing (1) for YARN right now (using hello-samza, and killing the
> RM). I will let you know how it behaves shortly. I believe it retries to
> connect to the RM for some period of time, and then the NM kills itself if
> it can't. If this is the case, then the container *would not be orphaned*.
> I also believe the retry count and wait time is tunable, so you can define
> your own exposure (e.g. you have a duplicate container for 1 minute, before
> the NM shuts itself down).
>
> Anecdotally, we've not seen leaked containers in YARN since we began
> properly shutting down NMs (not kill -9'ing them).
>
> > Depending on the time line among stabilizing stand alone and Mesos
> support
>
> Regarding stabilizing standalone, I'm working on the design doc right now.
> A proposed sketch of a ZK-based implementation was posted on SAMZA-516
> yesterday. My goal is to get the design doc done by tomorrow. This would
> let us discuss and open subtasks next week, and start coding thereafter.
> Realistically, I think standalone can be committed before end of Q1, and
> should be usable. After a month or two of operation, I'd wager it'll be
> relatively stable. So, that puts things at mid-Q2.
>
> Cheers,
> Chris
>
> On Thu, Jan 22, 2015 at 12:24 PM, Bae, Jae Hyeon <[email protected]>
> wrote:
>
>> I read through SAMZA-375. We will do one more round PoC Samza on Mesos.
>>
>> On Thu, Jan 22, 2015 at 11:14 AM, Bae, Jae Hyeon <[email protected]>
>> wrote:
>>
>> > I asked Mantis guy about orphaned container in Mesos and he was almost
>> > sure that Mesos won't let that happen.
>> >
>> > How is https://issues.apache.org/jira/browse/SAMZA-375 going? Depending
>> > on the time line among stabilizing stand alone and Mesos support, our
>> > schedule or decision will be changed.
>> >
>> > Thank you
>> > Best, Jae
>> >
>> > On Wed, Jan 21, 2015 at 4:58 PM, Chris Riccomini <
>> > [email protected]> wrote:
>> >
>> >> Hey all,
>> >>
>> >> Also, just opened this ticket to track work on samza-standalone:
>> >>
>> >>   https://issues.apache.org/jira/browse/SAMZA-516
>> >>
>> >> Cheers,
>> >> Chris
>> >>
>> >> On 1/21/15 1:32 PM, "Chris Riccomini" <[email protected]> wrote:
>> >>
>> >> >Hey Jae,
>> >> >
>> >> >> So, we need to find out running Samza on Mesos won't create that
>> >> >>problem, or Spark Streaming won't have that issue. In the worst case,
>> >> >>creating our own distribution coordination might be more predictable
>> >> >>instead of running Yarn on EMR.
>> >> >
>> >> >I think that there are two ways to fix this. One is to have the Kafka
>> >> >broker detect that there are two producers that are "the same", and
>> start
>> >> >dropping messages from the "old one" (and perhaps throw an exception
>> to
>> >> >the old producer). The other way is to have the Samza container detect
>> >> the
>> >> >problem, and kill itself.
>> >> >
>> >> >The kafka-based approach is a subset of the transactionality feature
>> >> >described here:
>> >> >
>> >> >
>> >> >
>> >>
>> https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+
>> >> >i
>> >> >n+Kafka
>> >> >
>> >> >The problem with the Kafka approach is that 1) it's kafka-specific,
>> and
>> >> 2)
>> >> >the generation id required to drop messages from an orphaned producer
>> >> >hasn't been implemented, except in a branch that's not been committed.
>> >> >
>> >> >So, if we accept that we shouldn't use Kafka as the solution for
>> >> detecting
>> >> >orphaned containers, the solution will have to go into Samza. Within
>> >> >Samza, there are two approaches. One is to use the resource scheduler
>> >> >(YARN, Mesos, etc.) to detect the problem. The other solution is to
>> use
>> >> >Samza, itself, to detect the problem.
>> >> >
>> >> >A YARN-specific example of how to solve the problem would be to have
>> the
>> >> >SamzaContainer periodically poll its local NM's REST endpoint:
>> >> >
>> >> >  http://eat1-app1218.corp.linkedin.com:8042/ws/v1/node/info
>> >> >
>> >> >To see what the status is, its last update time, etc. If the REST
>> >> endpoint
>> >> >can't be reached, the node is unhealthy, or the last update time is >
>> >> some
>> >> >time interval, the container could kill itself. Again, this is
>> >> >YARN-specific.
>> >> >
>> >> >I am not sure how Mesos handles split-brain. I've asked Tim Chen on
>> >> >SAMZA-375:
>> >> >
>> >> >
>> >> >
>> >>
>> https://issues.apache.org/jira/browse/SAMZA-375?focusedCommentId=14286204&;
>> >> >p
>> >>
>> >>
>> >age=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comme
>> >> >n
>> >> >t-14286204
>> >> >
>> >> >The last solution that I mentioned, using Samza directly (no
>> dependency
>> >> on
>> >> >Kafka, YARN, Mesos, etc), seems like the best long-term solution to
>> me.
>> >> We
>> >> >can either 1) introduce a heartbeat message into the coordinator
>> stream,
>> >> >or 2) use the existing checkpoint message as a heartbeat.  There is
>> some
>> >> >complexity to this solution that would need to be thought through,
>> >> though.
>> >> >For example, should the heartbeat messages be sent from the main
>> thread?
>> >> >What happens if the main thread is blocked on process() for an
>> extended
>> >> >period of time?
>> >> >
>> >> >What do others think? As a short-term fix, it seems to me like
>> YARN/Mesos
>> >> >should handle this automatically for us. Has anyone had experience
>> with
>> >> >orphaned containers in Mesos?
>> >> >
>> >> >> I really appreciate if you give me some guideline about implementing
>> >> >>custom cluster management interface of Samza.
>> >> >
>> >> >Samza jobs are started through bin/run-job.sh (inside samza-shell).
>> This
>> >> >CLI uses JobRunner to instantiate a StreamJobFactory (defined with
>> >> >job.factory.class), which returns a StreamJob. To implement your own
>> >> >cluster management, the first thing you'll need to do is implement
>> >> >StreamJobFactory and StreamJob. You can have a look at YarnJob or
>> >> >ProcessJob/ProcessJobFactory for an example of how to do this.
>> >> >
>> >> >Note that this code has changed slightly between 0.8.0 and master
>> >> (0.9.0).
>> >> >In 0.9.0, the partition-to-container assignment logic has been pulled
>> out
>> >> >of YARN's AM, and into a JobCoordinator class.
>> >> >
>> >> >The trick with adding EC2 ASG is going to be in handling partition
>> >> >shifting when a new node is added to the group. For example, if you
>> have
>> >> >two machines, each running one container, and you add a third machine,
>> >> >some of the input partitions (and corresponding StreamTasks) need to
>> be
>> >> >shifted from the two machines on to the third. The only way to do this
>> >> >right now is to:
>> >> >
>> >> >1. Stop all containers.
>> >> >2. Re-instantiate the JobCoordinator with a new container count.
>> >> >3. Start new containers on all three machines with the new partition
>> >> >assignments.
>> >> >
>> >> >In an ideal world, steps (1-3) would be handled automatically by
>> Samza,
>> >> >and wouldn't require container restarts. This is precisely what
>> >> >samza-standalone will accomplish. If you're interested in
>> contributing to
>> >> >samza-standalone, that would be awesome. I'm working on a design doc
>> >> right
>> >> >now, which I'm trying to post by EOW. Once that's done, we can
>> >> collaborate
>> >> >on design and split the code up, if you'd like.
>> >> >
>> >> >
>> >> >Cheers,
>> >> >Chris
>> >> >
>> >> >On 1/21/15 1:14 PM, "Bae, Jae Hyeon" <[email protected]> wrote:
>> >> >
>> >> >>Hi Samza Devs
>> >> >>
>> >> >>The significant concern I got recently is, container leak. The data
>> >> >>pipeline based on Samza can guarantee at least once delivery but the
>> >> >>duplicate rate is over 1.0%, I am having alerts right now. Container
>> >> >>leaks
>> >> >>will push a lot of alerts to me.
>> >> >>
>> >> >>So, we need to find out running Samza on Mesos won't create that
>> >> problem,
>> >> >>or Spark Streaming won't have that issue. In the worst case, creating
>> >> our
>> >> >>own distribution coordination might be more predictable instead of
>> >> >>running
>> >> >>Yarn on EMR.
>> >> >>
>> >> >>What about standalone Samza? If this is quite plausible and the best
>> >> >>solution in the near future, I want to be able to contribute. Could
>> you
>> >> >>share your thoughts or plans?
>> >> >>
>> >> >>I really appreciate if you give me some guideline about implementing
>> >> >>custom
>> >> >>cluster management interface of Samza. If it's possible, I want to
>> take
>> >> a
>> >> >>look to replace Yarn support with EC2 ASG stuff.
>> >> >>
>> >> >>Thank you
>> >> >>Best, Jae
>> >> >
>> >>
>> >>
>> >
>>
>
>

Reply via email to