Author: jkreps
Date: Fri Aug 30 05:33:32 2013
New Revision: 1518892
URL: http://svn.apache.org/r1518892
Log:
Document replication in a little more detail. Cover unclean leader election.
Modified:
kafka/site/08/design.html
kafka/site/08/introduction.html
Modified: kafka/site/08/design.html
URL:
http://svn.apache.org/viewvc/kafka/site/08/design.html?rev=1518892&r1=1518891&r2=1518892&view=diff
==============================================================================
--- kafka/site/08/design.html (original)
+++ kafka/site/08/design.html Fri Aug 30 05:33:32 2013
@@ -145,9 +145,9 @@ Now that we understand a little about ho
It's worth noting that this breaks down into two problems: the durability
guarantees for publishing a message and the guarantees when consuming a message.
<p>
-Many systems claim to provide "exactly once" delivery semantics, but it is
important to read the fine print, most of these claims are misleading (i.e.
they don't translate to the case where consumers or producers can fail or cases
where there are multiple consumer processes).
+Many systems claim to provide "exactly once" delivery semantics, but it is
important to read the fine print, most of these claims are misleading (i.e.
they don't translate to the case where consumers or producers can fail, or
cases where there are multiple consumer processes, or cases where data written
to disk can be lost).
<p>
-Kafka's semantics are straight-forward. When publishing a message we have a
notion of the message being "committed" to the log. Once a published message is
committed it will not be lost as long as one broker remains "alive". The
definition of alive, which will be described in more detail later, translates
roughly to "not crashed" and able to keep up with the leader. If a producer
attempts to publish a message and experiences a network error it cannot be sure
if this error happened before or after the message was committed. This is
similar to the semantics of inserting into a database table with an
autogenerated key.
+Kafka's semantics are straight-forward. When publishing a message we have a
notion of the message being "committed" to the log. Once a published message is
committed it will not be lost as long as one broker that replicates the
partition to which this message was written remains "alive". The definition of
alive as well as a description of which types of failures we attempt to handle
will be described in more detail in the next section. For now let's assume a
perfect, lossless broker and try to understand the guarantees to the producer
and consumer. If a producer attempts to publish a message and experiences a
network error it cannot be sure if this error happened before or after the
message was committed. This is similar to the semantics of inserting into a
database table with an autogenerated key.
<p>
These are not the strongest possible semantics for publishers. Although we
cannot be sure of what happened in the case of a network error, it is possible
to allow the producer to generate a sort of "primary key" that makes retrying
the produce request idempotent. This feature is not trivial for a replicated
system because of course it must work even (or especially) in the case of a
server failure. With this feature it would suffice for the producer to retry
until it receives acknowledgement of a successfully committed message at which
point we would guarantee the message had been published exactly once. We hope
to add this in a future Kafka version.
<p>
@@ -164,40 +164,69 @@ So effectively Kafka guarantees at-least
<h3><a id="replication">4.7 Replication</a></h3>
<p>
-Kafka replicates the log for each topic's partitions across the number of
servers configured with each topic. This allows automatic failover when a
server in the cluster fails so messages remain available in the presence of
failures.
+Kafka replicates the log for each topic's partitions across a configurable
number of servers (you can set this replication factor on a topic-by-topic
basis). This allows automatic failover to these replicas when a server in the
cluster fails so messages remain available in the presence of failures.
<p>
-Other messaging systems provide some replication-related features but in our
(biased) opinion this appears to be a tacked-on thing not heavily used and with
large downsides: slaves are inactive, throughput is heavily impacted, it
requires fiddly manual configuration, etc. Kafka is meant to be used with
replication by default—in fact we implement un-replicated topics as
replicated topics where the replication factor is one.
+Other messaging systems provide some replication-related features, but, in our
(totally biased) opinion, this appears to be a tacked-on thing, not heavily
used, and with large downsides: slaves are inactive, throughput is heavily
impacted, it requires fiddly manual configuration, etc. Kafka is meant to be
used with replication by default—in fact we implement un-replicated
topics as replicated topics where the replication factor is one.
<p>
-The unit of replication is the topic partition. For each partition Kafka has a
single leader and zero or more followers. The number of replicas is
configurable at the topic level at topic creation time. All reads and writes go
to the leader of the partition; each node is the leader for a share of
partitions and a follower for others. The logs on the followers are
identical—all have the same offsets and messages in the same order
(though, of course, at any given time the leader may have a few as-yet
unreplicated messages at the end of its log).
+The unit of replication is the topic partition. Under non-failure conditions
each partition Kafka has a single leader and zero or more followers. We call
the total number of replicas including the leader the replication factor. All
reads and writes go to the leader of the partition; each node is the leader for
a share of partitions and a follower for others. The logs on the followers are
identical to the leader's log—all have the same offsets and messages in
the same order (though, of course, at any given time the leader may have a few
as-yet unreplicated messages at the end of its log).
<p>
-Followers consume messages from the leader just as a normal Kafka consumer
would and applying them to their own log. Having the followers pull from the
leader has the nice property of allowing the follower to naturally batch
together log entries they are applying to their log. The leader keeps track of
which slaves are "in sync"—meaning not too far behind the leader's own
log. If a consumer dies or falls behind, the leader will remove it from the
list of in sync replicas.
+Followers consume messages from the leader just as a normal Kafka consumer
would and apply them to their own log. Having the followers pull from the
leader has the nice property of allowing the follower to naturally batch
together log entries they are applying to their log.
<p>
-A message is considered "committed" when all in sync replicas for that
partition have applied it to their log. The leader only gives out committed
messages to the consumer. This means that the consumer need not worry about
potentially seeing a message that could be lost if the leader fails. Producers,
on the other hand, have the option of either waiting for the message to be
committed or not, depending on their preference for latency and durability.
+As with most distributed systems automatically handling failures requires
having a precise definition of what it means for a node to be "alive". For
Kafka node liveness has two conditions
+<ol>
+ <li>A node must be able to maintain its session with Zookeeper (via
Zookeeper's heartbeat mechanism)
+ <li>If it is a slave it must replicate the writes happening on the
leader and not fall "too far" behind
+</ol>
+We refer to nodes satisfying these two conditions as being "in sync" to avoid
the vagueness of "alive" or "failed". The leader keeps track of the set of "in
sync" nodes. If a consumer dies or falls behind, the leader will remove it from
the list of in sync replicas. The definition of how far behind is too far is
controlled by the replica.lag.max.messages configuration.
+<p>
+In distributed systems terminology we only attempt to handle a "fail/recover"
model of failures where nodes suddenly cease working and then later recover
(perhaps without knowing that they have died). Kafka does not handle so-called
"Byzantine" failures in which nodes produce arbitrary or malicious responses
(perhaps due to bugs or foul play).
+<p>
+A message is considered "committed" when all in sync replicas for that
partition have applied it to their log. Only committed messages are ever given
out to the consumer. This means that the consumer need not worry about
potentially seeing a message that could be lost if the leader fails. Producers,
on the other hand, have the option of either waiting for the message to be
committed or not, depending on their preference for latency and durability.
This preference is controlled by the request.required.acks setting the producer
uses.
<p>
-The guarantee that Kafka offers is that a committed message will not be lost
as long as a single in sync replica survives.
+The guarantee that Kafka offers is that a committed message will not be lost
as long as a single in sync replica remains.
+<p>
+Kafka will remain available in the presence of node failures after a short
fail-over period, but may not remain available in the presence of network
partitions.
<h4>Replicated Logs: Quorums, ISRs, and State Machines (Oh my!)</h4>
At it's heart a Kafka partition is a replicated log. The replicated log is one
of the most basic primitives in distributed data systems, and there are many
approaches to implementation. A replicated log can be used by other systems as
a primitive for implementing other distributed systems in the <a
href="http://en.wikipedia.org/wiki/State_machine_replication">state-machine
style</a>.
<p>
-A replicated log models the process of coming into consensus on the order of a
series of values (entries 0, 1, 2, ...). There are many ways to implement this,
but the simplest and fastest is with a leader who chooses the ordering of
values provided to it. As long as the leader remains alive and all followers
need only copy the values and ordering it chooses.
+A replicated log models the process of coming into consensus on the order of a
series of values (generally numbering the log entries 0, 1, 2, ...). There are
many ways to implement this, but the simplest and fastest is with a leader who
chooses the ordering of values provided to it. As long as the leader remains
alive and all followers need only copy the values and ordering it chooses.
+<p>
+Of course if leaders didn't fail we wouldn't need followers! When the leader
does die we need to choose a new leader from among the followers. But followers
themselves may fall behind or crash so we must ensure we choose an up-to-date
follower. The fundamental guarantee a log replication algorithm must provide is
that if we tell the client a message is committed, and the leader fails, the
new leader we elect must also have that message. This yields a tradeoff: if the
leader waits for more followers to acknowledge a message before declaring it
committed then there will be more potentially electable leaders.
<p>
-Of course if leaders didn't fail we wouldn't need followers! When the leader
does die we need to choose a new leader from among the followers. But followers
themselves may fall behind or crash so we must ensure we choose an up-to-date
follower. The fundamental guarantee a log replication algorithm must provide is
that if we tell the client a message is committed, and the leader fails, the
new leader we elect must also have that message. This yields a tradeoff: if the
leader waits for more followers to replicate a message before declaring it
committed then there will be more potentially electable leaders.
+If you choose the number of acknowledgements required and the number of logs
that must be compared to elect a leader such that there is guaranteed to be an
overlap then this is called a Quorum.
<p>
-A common approach to this tradeoff is to use a majority (quorum) for both the
commit decision and the leader election. This is not what Kafka does, but let's
explore it anyway to understand the tradeoffs. Let's say we want to tolerate
<i>f</i> failures. If 2<i>f</i>+1 servers must replicate a message prior to a
commit being declared by the leader, and if we elect a new leader by electing
the follower with the most complete log from at least 2<i>f</i>+1 replicas,
then, with no more than <i>f</i> failures there must be at least one server in
common between those that committed the message and the servers that were
available to take over as leader and hence the message will be preserved. There
are many remaining details that each algorithm must handle (such as ensuring
log consistency during leader failure or changing the set of servers in the
replica set) but we will ignore these for now.
+A common approach to this tradeoff is to use a majority vote for both the
commit decision and the leader election. This is not what Kafka does, but let's
explore it anyway to understand the tradeoffs. Let's say we want to tolerate
<i>f</i> failures. If 2<i>f</i>+1 servers must replicate a message prior to a
commit being declared by the leader, and if we elect a new leader by electing
the follower with the most complete log from at least 2<i>f</i>+1 replicas,
then, with no more than <i>f</i> failures there must be at least one server in
common between those that committed the message and the servers that were
available to take over as leader and hence the message will be preserved.
There are many remaining details that each algorithm must handle (such as
precisely defined what makes a log more complete, ensuring log consistency
during leader failure or changing the set of servers in the replica set) but we
will ignore these for now.
<p>
-This quorum approach has a very nice property: the latency is dependent on
only the fastest servers. That is, if the replication factor is three, the
latency is determined by the faster of the two slaves.
+This majority vote approach has a very nice property: the latency is dependent
on only the fastest servers. That is, if the replication factor is three, the
latency is determined by the faster slave not the slower one.
<p>
There are a rich variety of algorithms in this family including Zookeeper's <a
href="http://www.stanford.edu/class/cs347/reading/zab.pdf">Zab</a>, <a
href="https://ramcloud.stanford.edu/wiki/download/attachments/11370504/raft.pdf">Raft</a>,
and <a href="http://pmg.csail.mit.edu/papers/vr-revisited.pdf">Viewstamped
Replication</a>. The most similar academic publication we are aware of to
Kafka's actual implementation is <a
href="http://research.microsoft.com/apps/pubs/default.aspx?id=66814">PacificA</a>
from Microsoft.
<p>
-The downside of the quorum is that the amount of replication to tolerate
failures is a bit high. To tolerate one failure requires five copies of the
data, and to tolerate two failures requires five copies of the data. In our
experience a single failure is not enough, but doing every write five times,
with 5x the disk space requirements and 1/5th the throughput, is just not
practical for large volume data problems. This is likely why quorum algorithms
more commonly appear for shared cluster configuration such as in Zookeeper but
are less common for primary data storage. For example in HDFS the namenode's
high-availability feature is built on quorum-based journal but quorum
journalling is not used for the data itself.
+The downside of majority vote is that it doesn't take many failures to leave
you with no electable leaders. To tolerate one failure requires three copies of
the data, and to tolerate two failures requires five copies of the data. In our
experience having only enough redundancy to tolerate a single failure is not
enough for a practical system, but doing every write five times, with 5x the
disk space requirements and 1/5th the throughput, is not very practical for
large volume data problems. This is likely why quorum algorithms more commonly
appear for shared cluster configuration such as Zookeeper but are less common
for primary data storage. For example in HDFS the namenode's high-availability
feature is built on a <a
href="http://blog.cloudera.com/blog/2012/10/quorum-based-journaling-in-cdh4-1">majority-vote-based
journal</a> but this more expensive approach is not used for the data itself.
+<p>
+Kafka takes a slightly different approach to choosing its quorum set. Instead
of majority vote Kafka dynamically maintains a set of in-sync replicas that are
caught-up to the leader. Only members of this set are eligible for election as
leader. A write to a Kafka partition is not considered committed until
<i>all</i> in-sync replicas have received the write. This ISR set is persisted
to zookeeper whenever it changes. Because of this, any replica in the in-sync
set is eligible to be elected leader. This is an important factor for Kafka's
usage model where there are many partitions and ensuring leadership balance is
important. With this ISR model and <i>N</i> replicas a Kafka topic can tolerate
<i>N</i>-1 failures without losing committed messages.
+<p>
+In practice for most use cases we hope to handle we think this tradeoff is a
reasonable one. In practice to tolerate <i>f</i> failures both the majority
vote and ISR approach will wait for the same number of replicas to acknowledge
before committing a message (e.g. to survive one failure a majority quorum
needs three replicas and one acknowledgement and the ISR approach requires two
replicas and one acknowledgement). The ability to commit without the slowest
servers is an advantage of the majority vote approach but we think it is
ameliorated by allowing the client to choose whether they block on the message
commit or not, and the additional throughput and disk space due to the lower
required replication factor is worth it.
+<p>
+Another important design distinction is that Kafka does not require that
crashed nodes recover with all their data intact. It is not uncommon for
replication algorithms in this space to depend on the existence of "stable
storage" that cannot be lost in any failure-recovery scenario without potential
consistency violations. There are two primary problems with this assumption.
First, disk errors are the most common problem we observe in real operation of
persistent data systems and they often do not leave data intact. Secondly, even
if this were not a problem, we do not want to require the use of fsync on every
write for our consistency guarantees as this can reduce performance by two to
three orders of magnitude. Our protocol for allowing a replica to rejoin the
ISR ensures that before rejoining it must fully re-sync again even if it lost
unflushed data in its crash.
+
+<h4>Unclean leader election: What if they all die?</h4>
+
+Note that Kafka's guarantee with respect to data loss is predicated on at
least on replica remaining in sync. If all the nodes replicating a partition
die, this guarantee no longer holds.
+<p>
+However a practical system needs to do something reasonable when all the
brokers die. If you are unlucky enough to have this happen it is important to
consider what will happen. There are two behaviors that could be implemented:
+<ol>
+ <li>Wait for a broker in the last in-sync set to come back to life and
choose this broker as the leader (hopefully it still has all its data).
+ <li>Choose the first broker to come back to life as the leader.
+</ol>
<p>
-Kafka takes a different approach from quorum replication. Instead Kafka
dynamically maintains a set of in-sync replicas that are caught-up to the
leader. Only members of this set are eligible for election as leader. A write
to a Kafka partition is not considered committed until all in-sync replicas
have received the write. This ISR set is persisted to zookeeper whenever it
changes. Because of this any replica in the in-sync set is eligible to be
elected leader. This is an important factor in Kafka where there are many
partitions and ensuring leadership balance is important. With this ISR model
<i>N</i> replicas a Kafka topic can tolerate <i>N</i>-1 failures without losing
committed messages.
+This is a simple tradeoff between availability and consistency. If we wait for
a broker in the ISR then we will remain unavailable as long as this broker is
down. If this broker was destroyed or its data was lost then we are permanently
down. If, on the other hand, a non-in-sync broker comes back to life and we
allow it to become leader then it's log becomes the source of truth even though
it is not guaranteed to have every committed message. In our current release we
choose the second strategy and favor choosing a potentially inconsistent broker
when all other machines are dead. In the future we would like to make this
configurable to better support uses where downtime is preferable to
inconsistency.
<p>
-In practice for most use cases we hope to handle we think this tradeoff is a
reasonable one. In practice to tolerate <i>f</i> failures both the quorum and
ISR approach will wait for the same number of replicas to acknowledge before
committing a message (e.g. to survive one failure a quorum needs three replicas
and one acknowledgement and the ISR approach requires two replicas and one
acknowledgement). The ability to commit without the slowest servers is an
advantage of the quorum approach but we think it is ameliorated by allowing the
client to choose whether they block on the message commit or not, and the
additional throughput and disk space due to the lower required replication
factor is worth it.
+This dilemma is not specific to Kafka, it exists in any quorum-based scheme.
For example in a majority voting scheme if a majority of servers suffer a
permanent failure then you must either choose to lose 100% of your data or
violate consistency by taking what remains on an existing server as your new
source of truth.
<h4>Replica Management</h4>
-The above discussion on a replicated log covers only a single partition.
However a Kafka cluster will manage hundreds or thousands of these. We attempt
to balance partitions within a cluster in a round-robin fashion to avoid
clustering all partitions for high-volume topics on a small number of nodes.
Likewise we try to balance leadership so that each node is the leader for a
proportional share of its partitions.
+The above discussion on replicated logs really covers only a single log, i.e.
one topic partition. However a Kafka cluster will manage hundreds or thousands
of these. We attempt to balance partitions within a cluster in a round-robin
fashion to avoid clustering all partitions for high-volume topics on a small
number of nodes. Likewise we try to balance leadership so that each node is the
leader for a proportional share of its partitions.
<p>
It is also important to optimize the leadership election process as that is
the critical window of unavailability. A naive implementation of leader
election would end up running an election per partition for all partitions a
node hosted when that node failed. Instead we elect a single "controller" that
is responsible for leadership assignment decisions. This controller serves an
analogous role to the role of leaders themselves—we avoid making a
sequence of leadership decisions by choosing a designated process to make all
these decisions and then handling faults in this process. The result is that we
are able to batch together many of the required leadership change notifications
which makes the election process far cheaper and faster for a large number of
partitions.
Modified: kafka/site/08/introduction.html
URL:
http://svn.apache.org/viewvc/kafka/site/08/introduction.html?rev=1518892&r1=1518891&r2=1518892&view=diff
==============================================================================
--- kafka/site/08/introduction.html (original)
+++ kafka/site/08/introduction.html Fri Aug 30 05:33:32 2013
@@ -11,7 +11,7 @@ First let's review some basic messaging
<li>Kafka is run as a cluster comprised of one or more servers each of
which is called a <i>broker</i>.
</ul>
-So, at a high level, producers are send messages over the network to the Kafka
cluster which in turn serves them up to consumers like this:
+So, at a high level, producers send messages over the network to the Kafka
cluster which in turn serves them up to consumers like this:
<div style="text-align: center; width: 100%">
<img src="../images/producer_consumer.png">
</div>
@@ -73,9 +73,10 @@ Not that partitioning means Kafka only p
<h4>Guarantees</h4>
-Kafka gives the following guarantees
+At a high-level Kafka gives the following guarantees
<ul>
<li>Messages sent by a producer to a particular topic partition will be
appended in the order they are sent. That is if a message M1 is sent by the
same producer as a message M2, and M1 is sent first, then M1 will have a lower
offset then M2 and appear earlier in the log.
- <li>A consumer instance sees messages in the order they are stored in the log
+ <li>A consumer instance sees messages in the order they are stored in the
log.
<li>For a topic with replication factor N, we will tolerate up to N-1 server
failures without losing any messages committed to the log.
-</ul>
\ No newline at end of file
+</ul>
+More details on these guarantees are given in the design section of the
documentation.
\ No newline at end of file