[
https://issues.apache.org/jira/browse/KAFKA-156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13631474#comment-13631474
]
Scott Carey commented on KAFKA-156:
-----------------------------------
Jay --
I agree, the duplication issue does not depend on whether there is a disk or
memory queue. However, in both cases one can choose to dither duplicate
messages or drop them on failures. In the in memory case, biasing it to drop a
message rather than duplicate on a failure is more acceptable than the on disk
case. This is because an in memory queue is more likely to suffer loss than a
disk queue. For example, a producer may crash or be kill-9'd and we would
expect in flight, in memory data to be lost.
My thoughts on this issue are biased by our legacy system -- each
producer-equivalent would log locally and then the equivalent of the broker
would 'harvest' these logs with no possible duplication. Loss is possible if
the disks failed on the client, but that would take down the whole app anyway.
Furthermore, we use SSD's on those servers (since late 2008!) and have not had
a single SSD drive failure where data was lost (we had a couple have their
performance degrade to abysmal levels, but the data was still there).
Additionally, we are able to restart / service the nodes that collect the data
without data loss because of the local spooling. Replication in Kafka will
allow us to do rolling restarts of brokers and achieve similar operational
utility. The need for 'spill to disk' is certainly less with replication
active. However, it doesn't take us long to fill our entire memory buffer up
full of messages on some of our clients -- even a 10 second window of
unavailability means losing messages unless we can spill to disk.
On your proposal:
* What happens if there is a 'bubble' in sequence ids from the broker
perspective? What does the broker do? How does the client know to re-send?
* What happens when two clients assign themselves the same id?
Answer to question on my proposal:
* It is assumed that the final batch commit is idempotent, so if the client
fails to get the final ACK (step 4, "Acknowledge Batch Commit" it will go back
to step 3 and send the batch commit message again. If it is the same broker,
it can simply acknowledge since it already committed it. If it is a replica,
then there are two cases:
a) The other broker has the UUID info (which is replicated?) and can restart
the process at the right point.
b) Failover to another broker starts the process over at step 1 with the same
UUID, and when the broker that crashed comes online the brokers in the replica
set reconcile to remove the duplicate. There are a limited number of in flight
or recently in flight batches.
I think b will work, but I don't know enough about how a broker replica set
reconciles in 0.8 when one fails. If we assume strict ordering on whether the
replica or the client gets the ACK for a batch commit first, a repair process
should be consistent.
A two-phase produce doesn't have to be serial from batch to batch -- a few
pipelined requests could be supported, but too many could be used for DOS. A
high-water-mark approach is more difficult to pipeline, but probably does not
need it.
One idea I had is far more radical. It boils down to these questions:
Why even have a separate producer protocol at all? Why isn't the consumer
protocol good enough for getting data to the brokers?
I admit, this is tricky and I have not thought through it well; but I think it
is worth sharing. The consumer protocol is highly reliable and easy to enforce
once-only semantics. If there was some sort of client-initiated broker 'pull'
with the consumer protocol, there might be some opportunities for overall
simplification in the protocol and more sharing in the code.
A producer would be required to assign an offset id and increment per message.
The producer would trigger the broker to begin initiate a request to read all
of the batches from that starting ID to the "end" , commit it, then start from
the last offset to the "end", and repeat. This makes a producer like a broker
-- except that it wants to drop data a lot faster, and therefore needs to know
how far along the broker is in pulling data down. Perhaps it can safely assume
that if batch "1 to 50" was requested, and subsequently batch "51 to 100" is
requested, that the request for the latter batch indicates that the first has
successfully been committed, but that serializes batches and prevents
pipelining. Alternatively the "1 to 50 is committed" message can ride with the
"get 51 to 100" request.
What I find useful here is the same thing that is great about the consumer
protocol: putting the burden on the one obtaining the data to track progress is
cleaner in the face of failure.
This bears similarity to your proposal, but with inversion of control -- the
broker asks for the next batch when it is ready. If there is a broker
failure, the replica can pull in messages, and duplicate removal can occur when
they reconcile (and the offset id in the topic will be consistent, since both
sides use offsets). Producers are then responsible for buffering up to the
threshold they can tolerate, and can spool to disk if they please (perhaps
re-using some broker code to do so).
> Messages should not be dropped when brokers are unavailable
> -----------------------------------------------------------
>
> Key: KAFKA-156
> URL: https://issues.apache.org/jira/browse/KAFKA-156
> Project: Kafka
> Issue Type: Improvement
> Reporter: Sharad Agarwal
> Fix For: 0.8
>
>
> When none of the broker is available, producer should spool the messages to
> disk and keep retrying for brokers to come back.
> This will also enable brokers upgrade/maintenance without message loss.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira