James Xu created STORM-153:
------------------------------

             Summary: Scaling storm beyond 1.2k workers
                 Key: STORM-153
                 URL: https://issues.apache.org/jira/browse/STORM-153
             Project: Apache Storm (Incubating)
          Issue Type: Improvement
            Reporter: James Xu


https://github.com/nathanmarz/storm/issues/620

Our storm & zookeeper cluster has reached a scaling limit, on the
zookeeper we retain storm state, kafka commit offsets and trident
transaction state. Our hardware is no joke (RAIDed 6 disks with 512mb
cache). The reason I say it's reached scaling limit is because a few
users have reported trying to deploy a topology, only to see the
workers continually die and restart with executor not alive in Nimbus
logs. Additionally, recently when a user deployed a topology with 140
workers, not only did his topology enter into continuous workers death
& restart, but at the same moment, half of production also went into
this state and never recovered until I manually deactivated a bunch of
production. So this person's topology caused previously stable workers
to get executor not alive exceptions.

We have a dedicated zookeeper SRE who says zkget, and zk list
children, and the heartbeats of zookeeper ephemeral nodes are
relatively very cheap operations, but zk create and set are expensive
since they require quorum, and he's never seen a zookeeper cluster
with so many quorum operations at any company he's worked at in the
past. The rate of quorum ops tends to be at around 1.5k/s sustained
rate. If you do the math, starting with the fact we have 1.2k workers,
and that our workers are configured for 1 heartbeat a second, that
implies 1.2k/s zksets per second which is 80% of the total number of
quorum operations we're measuring via zookeeper metrics. Comparing our
empirical throughput to this benchmark
http://zookeeper.apache.org/doc/r3.1.2/zookeeperOver.html our real
world results are not quite as good as what's claimed here, but in the
same order of magnitude, storm also uses much larger zknode payload
approaching 1mb in some cases. We need a way to scale to AT LEAST 10k
workers, preferably 100k workers. So what are the solutions here?

SOLUTION #1

Ephemeral nodes are an obvious solution here. There's a few hidden
complexities:

#1 We're not actually reducing the peak quorum operation rate, because
 creating an ephemeral node requires a quorum operation. Clearly it's
 less load overall though, because you don't have sustained zksets
 every second. However if hitting a peak quorum operation of 1.5k/s
 were to cause zk sessions to die such as what we've observed on the
 current cluster, then ephemeral don't solve this problem, because
 they still require quorum to create. I think the only way to
 determine if this will occur is to build it and extensive testing.

I'm not sure what netflix curator does here, but IF the ephemeral
nodes are immediately recreated when the session is re-established
then you easily enter into cascading failure because that means higher
reconnect rates are positively correlated with higher quorum ops, so
this easily could enter into tailspin. A simple solution here is to
NOT recreate the ephemeral nodes during a reconnect, and instead wait
for the heartbeat to occur.

#2 Ephemeral nodes are tied to zk session, so if you kill -9 a worker,
 from my understanding there will be some latency in when the
 ephemeral node is actually cleaned up. If the worker is restarted,
 and tries to recreate the node during start-up the node will already
 exist, so it should delete existing nodes on start-up. Re:
 
http://developers.blog.box.com/2012/04/10/a-gotcha-when-using-zookeeper-ephemeral-nodes/
 (URL courtesy of Siddharth)

#3 During a leader elections, I see all the zk sessions dying, and
 curator code entering into a reconnecting phase. (This is true for
 zookeeper 3.4 and 3.3, maybe not 3.5). Since ephemeral nodes have a
 lifetime tied to zk sessions, all the ephemeral nodes will dissapear
 during zk election. We don't want the whole storm cluster to die
 during zk elections, so the nimbus algorithm must do more than just
 kill workers when the ephemeral nodes dissapear. Instead you want a
 different algorithm, something that waits for ephemeral nodes to have
 dissapeared for a certain length of time.

#5 Strictly speaking this solution is not horizontally scalable, it
 scales with the number of nodes in the zookeeper cluster, but the
 quorum op rate decreases when you scale up a zookeeper cluster. Not
 sure what the upper bound is here and if this actually gets us to
 100k or just 10k.

#6 What code does the ping to keep the zookeeper session alive? Is
 that zookeeper client code? because it should be ran in a high
 priority thread. I've seen storm workers die from executor not alive
 just because they have CPU load spikes and the heartbeat doesn't get
 in, once I raised the priority of this thread it solved the
 problem. So the ping thread should do the same.

Anyone have experience with using ephemeral nodes as a health status
metric, not merely for service discovery?

Overall all these complexities seem manageable.

SOLUTION #2

You actually don't need state persistence, or quorum to do heartbeats,
which is what zookeper is adding in causing complexity. Heartbeats
could just be sent to a series of heartbeat-daemons that have no
persistent storage using random distribution. Nimbus would then query
the heartbeat-daemons to understand the state of the cluster. This is
horizontally scalable, could scale to 100k. We would use zookeeper to
do discovery of the heartbeat-daemons. This would substantially reduce
the load on zookeeper by several orders of magnitude.

This is similar to how mesos does heartbeats. Mesos actually has the
'master' receive heartbeats from all the slaves. They've scaled this
to a couple thousand slaves, and are looking at partioning the
heartbeats across the non-elected masters as well as the master to
scale this up further. So that's another idea, to use all the nimbus
masters to handle this. Which is nice because you don't have to add
yet another daemon to the storm ecosystem.

SOLUTION #3

You can also decrease the heartbeat frequency from 1 second, to 10
seconds. Presumbly this will let you scale 10x then. Any draw backs to
this? I know the nimbus UI stats will update less frequently, it'll
take slightly longer to find dead workers, is that it? It doesn't get
us to 100x scale, which potentially the first 2 solutions could.

---------- rgs1: wrt to:

" Ephemeral nodes are tied to zk session, so if you kill -9 a worker,
from my understanding there will be some latency in when the ephemeral
node is actually cleaned up. If the worker is restarted, and tries to
recreate the node during start-up the node will already exist, so it
should delete existing nodes on start-up."

this is trivially cheap on most cases, where the session will be gone
for good, since you would just call exists() before the delete() which
is not a quorom operation (but served from memory on whatever host you
are connected to).

---------- d2r: +1

storm also uses much larger zknode payload approaching 1mb in some
cases.  #604 : I think this might happen depending on the number of
workers & parallelism of the topology itself, not the number of
workers in the cluster.

---------- revans2: We ran into similar scaling issues too. We have
not done anything permanent yet, except reconfigure the nodes that ZK
is running on to be able to handle more IOPS, which seems to be the
primary limiting factor for us. Although we are not at the same scale
that you are yet so the number of network connections to ZK may start
to be more of an issue soon.

I did some research into the size of the data sent to ZK by our
instance of storm. I found that the data is rather compressible and
with a simple GzipOutputStream I could reduce the size to 1/5th the
original at the cost of some CPU. The 1MB limit that ZK has is also
configurable -Djute.maxbuffer=<NUM_BYTES>, although it has to be
configured for all the ZK nodes, the supervisors and all the workers
too because it is a limit when reading jute encoded data (what ZK uses
for serialization).

Solution 3 is how Hadoop, Mesos, and many other large systems
work. But it requires a new solution to persist state and will need a
lot of work to be resistant to multiple failures, which are things
that ZK gives you for free. Most of these large systems that heartbeat
to a central server either have no state recovery on failure, or if
they do it was added in very recently. Because the zk layer is
abstracted away from the code that reads/persists the state we could
make it pluggable and play around with other key/value stores. We
could still leave ZK for things that need it like leader election.

Another thing we thought about doing was creating a RAM disk and
configuring ZK to write edit logs to the RAM disk, along with very
aggressive rolling and cleanup of those logs. This would only work if
IOPS is really the bottleneck for you and not something else in ZK
like lock contention or CPU.

---------- rgs1: One detail of implementing heartbeats by ephemeral
znodes (so the actual heart-beating is delegate to ZooKeeper pings
which are local to each cluster node - not a quorum operation - hence
very cheap) is how to deal with the transient nodes of ephemeral
znodes. Because of network hiccups (and clients issues) ephemeral
nodes can come and go. So this means workers could be killed/started
when flaps happen. We could mitigate this (pretty much entirely) by
not reacting immediately to ephemeral nodes appearing/disappearing but
keeping a local tab of the available nodes and only remove them when
they've been gone for a while (and potentially only consider them
alive when they've been around for a while). This shouldn't be
expensive to implement and in terms of Zk it means polling via
continous getChildren() calls which are cheap and local (to a cluster
node).

---------- d2r: Regarding the discussion on storm-user, we'd like to
secure heartbeats such that workers of one topology cannot access
those of another topology. So what follows is the requirements we have
and how we're approaching it. Maybe this could be valuable to keep in
mind as we're discussing the new design, so that the design will not
make it unnecessarily difficult to add confidentiality guarantees to
heartbeats later on.

I think it is sufficient to guarantee the following:

storm cluster can do anything it wants topology workers can only read
heartbeats of workers in that same topology topology workers can only
write to or delete heartbeats of workers in that same topology.  We're
trying out ZooKeeper ACLs with MD5-digest authentication in the
near-term. The only tricky thing really is getting around the fact
that child znodes do not inherit from parents, and so they must be
explicitly set on each znode.

This complication arises in two cases with the current implementation:

when nimbus cleans up worker heartbeats in ZK and must be authorized
to remove those nodes and any children the worker may no longer clean
up its own heartbeats, as allowing DELETE on the parent node (created
by nimbus) would also allow any other worker to delete the workers
heartbeats We have the worker set an ACL with two entries, one to
allow the worker itself full access, and one that uses the digest of
the cluster's credentials to allow full access to the cluster's user.




--
This message was sent by Atlassian JIRA
(v6.1.4#6159)

Reply via email to