[
https://issues.apache.org/jira/browse/KAFKA-156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13621433#comment-13621433
]
Scott Carey commented on KAFKA-156:
-----------------------------------
I am positive that the producer wire protocol has to have built-in features to
support the ability to prevent dropped messages when brokers are unavailable.
There is no way to achieve 'optimal transmission' without two-phase commit or
idempotence between the producer and broker. I define 'optimal transmission'
as the guarantee that data is not duplicated or lost after some well known
point has been reached as viewed by the producer. Prior to this point (for
example, when the message is in a in memory queue), there can be no guarantees
from any system.
{quote}
"FWIW Clearspring has a pipeline with: ConcurrentQueue --> spill to disk queue
with max size (then drops messages) --> SyncProducer with retry/backoff. "
{quote}
Such a system can get as close as only losing or duplicating one 'batch' of
messages, where that batch size is >= 1 message. At best, when reading form
the data spilled from disk, between sending a batch and recieving
acknowledgement, a crash at either end will leave that batch in limbo. The
batch needs an identifier that both sides can persist or generate to identify
the batch in case one side has to recover from a crash.(two phase commit).
Many database systems have this (see
http://www.postgresql.org/docs/9.2/static/sql-prepare-transaction.html), where
as a client you can name a transaction so that after you get an acknowledgement
from the prepare commit, the client can log that it has been prepared, send the
commit command, and if it crashes before getting the acknowledgement, upon
recovery it can look up the identifier for the in flight commit, and check with
the system to see if it succeeded or not.
We have an internal system that we are attempting to replace Kafka with, but it
does not guarantee delivery as we do. We spool data on our producers into
batches (a file per batch), and then transfer these batches into the downstream
system. This system stores these batches in a staging area, so that if either
side crashes before the batch transfer completes recovery is simple. Upon
validating that the batch (which is uniquely named) is identical on both
sides, the producer can remove it locally and promote from the staging area to
the completed area (atomically). This again is safe if either side crashes,
since an item in the staging area that does not exist on the producer indicates
it has successfully been moved.
Kafka will have to mimic this sort of safety at each stage. On the consumer
side, batch offsets + partition and topic information serve as unique
identifiers for a batch that allow only-once semantics. On the producer side,
is there something equivalent?
Replication mitigates the problem significantly, but there is still the
possibility that an item is dropped or duplicated if there is a transient
network issue that TCP/IP does not handle, if the broker does not hand out
unique batch ids for each batch (I am unsure of this).
If messages are spooled to disk when a broker is unavailable, the process of
reading back items from that log and sending them to the broker without loss or
duplication is tricky. Each batch of messages will need an identifier shared
between the broker and producer, and the batch will need to be marked with the
identifier safely to disk prior to sending the batch to the broker. After
acknowledgement the producer can delete the batch or mark it complete. If it
crashes between sending the batch to the broker and receiving a response (or
otherwise fails to get acknowledgement) it must be able to ask the broker
whether the batch with the given identifier was received, or alternatively, it
can send the batch twice and the broker will ignore the duplicate send based on
the identifier.
Does the producer wire protocol include batch ids generated by the broker so
that this can be implemented? It does not seem to be the case here
https://cwiki.apache.org/KAFKA/a-guide-to-the-kafka-protocol.html#AGuideToTheKafkaProtocol-ProduceAPI
This protocol does not seem to support the ability to support "only once"
message semantics.
> Messages should not be dropped when brokers are unavailable
> -----------------------------------------------------------
>
> Key: KAFKA-156
> URL: https://issues.apache.org/jira/browse/KAFKA-156
> Project: Kafka
> Issue Type: Improvement
> Reporter: Sharad Agarwal
> Fix For: 0.8
>
>
> When none of the broker is available, producer should spool the messages to
> disk and keep retrying for brokers to come back.
> This will also enable brokers upgrade/maintenance without message loss.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira