@Guozhang
We actually have separate topics depending on the source of the message and
the multicast distribution group (the set of destinations). Our topics are
named: source_multicast-group. We do not aggregate data but we do static
routing based on the destination and the destination set (that is, we set
up a tree of mirrormakers to copy the topic from the original datacenter to
the others). This gives us a static topology (no path failure resilience)
and limits the number of multicast groups (since each multicase group needs
a different topic for every source), but for our data replication pattern
is a good match. It also helps that the order of writes in our system is
not important, so we do not need a single point of aggregation :)

@Jun
The actual problem is the congestion window, I do not think that the we are
suffering due to the transmit/receive socket buffers (we are using the same
buffers over different links with similar RTT but different loss rates and
the TCP connection throughput varies a lot, this would not be the case if
the amount of in-flight data would be limited by buffer size). The
socket-level cwnd metrics also support our hypothesis and we also have
measured using iperf what a single connection can transport across a lossy
inter-DC link. Jianqie seems to be suggesting a different blocking
scenario, similar to head-of-line blocking because of other requests,
however increasing the number of fetchers will not necessarily help since
all fetchers will mux their request over a single TCP connection when
sending requests to a single broker. The TCP connection's congestion window
will continue to be the limiting factor. I would say that the only way out
of this is to pool multiple TCP connections from a single consumer to a
broker.

For identical mirroring, I thought that when asking for data between a pair
of offsets the result should always be the same. Would it be possible to
produce also indicating the offsets where the data should go?

Regards,
Vlad

On Wed, Mar 25, 2015 at 10:21 AM, Jiangjie Qin <j...@linkedin.com.invalid>
wrote:

> Hi Jun, I think bumping up socket.receive.buffer.bytes will help but might
> not be sufficient.
> There are actually two related factors here:
> 1. Pipelining TCP packets when send a single request/response.
> 2. Pipelining multiple requests/responses
> Bumping up socket.receive.buffer.bytes help with the 1) but does not help
> with 2).
>
> For example, consider the following scenario.
> RTT = 100 ms
> Bandwidth = 1 Gbps(128 MBps).
> Request size = 10KB
> Response size = 1MB
> If we only have a single fetcher which is working in a blocking way. The
> max number of requests we can achieve is 10 requests/sec because its
> restricted by the RTT. In this case, bumping up socket buffer size will
> not help. I think this is the situation Vlad mentioned.
>
> One option might be increase num.consumer.fetchers, so we might have more
> fetcher thread for a since consumer instance (due to the implementation,
> num.consumer.fetchers actually means "at most num.consumer.fetchers²).
>
> One thing might worth considering is that can we enforce pipelining in new
> consumer like we do for new producer. Since we have correlation ID,
> reorder should be easily handled. I haven¹t got a chance to read the new
> consumer code, but I think it is worth doing if we haven¹t done so.
>
> Jiangjie (Becket) Qin
>
> On 3/25/15, 9:50 AM, "Jun Rao" <j...@confluent.io> wrote:
>
> >To amortize the long RTT across data centers, you can tune the TCP window
> >size by configuring a larger socket.receive.buffer.bytes in the consumer.
> >
> >For the last one, it seems that you want identical mirroring. The tricky
> >thing is to figure out how to avoid duplicates when there is a failure. We
> >had some related discussion in the context of transactional messaging (
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Transactional+Messaging+
> >in+Kafka
> >).
> >
> >Thanks,
> >
> >Jun
> >
> >On Tue, Mar 24, 2015 at 11:44 AM, vlad...@gmail.com <vlad...@gmail.com>
> >wrote:
> >
> >> Dear all,
> >>
> >> I had a short discussion with Jay yesterday at the ACM meetup and he
> >> suggested writing an email regarding a few possible MirrorMaker
> >> improvements.
> >>
> >> At Turn, we have been using MirrorMaker for a a few months now to
> >> asynchronously replicate our key/value store data between our
> >>datacenters.
> >> In a way, our system is similar to Linkedin's Databus, but it uses Kafka
> >> clusters and MirrorMaker as its building blocks. Our overall message
> >>rate
> >> peaks at about 650K/sec and, when pushing data over high bandwidth delay
> >> product links, we have found some minor bottlenecks.
> >>
> >> The MirrorMaker process uses a standard consumer to pull data from a
> >>remote
> >> datacenter. This implies that it opens a single TCP connection to each
> >>of
> >> the remote brokers and muxes requests for different topics and
> >>partitions
> >> over this connection. While this is a good thing in terms of maintaining
> >> the congestion window open, over long RTT lines with rather high loss
> >>rate
> >> the congestion window will cap, in our case at just a few Mbps. While
> >>the
> >> overall line bandwidth is much higher, this means that we have to start
> >> multiple MirrorMaker processes (somewhere in the hundreds), in order to
> >> completely use the line capacity. Being able to pool multiple TCP
> >> connections from a single consumer to a broker would solve this
> >> complication.
> >>
> >> The standard consumer also uses the remote ZooKeeper in order to manage
> >>the
> >> consumer group. While consumer group management is moving closer to the
> >> brokers, it might make sense to move the group management to the local
> >> datacenter, since that would avoid using the long-distance connection
> >>for
> >> this purpose.
> >>
> >> Another possible improvement assumes a further constraint, namely that
> >>the
> >> number of partitions for a topic in both datacenters is the same. In my
> >> opinion, this is a sane constraint, since it preserves the Kafka
> >>ordering
> >> guarantees (per partition), instead of a simple guarantee per key. This
> >> kind of guarantee can be for example useful in a system that compares
> >> partition contents to reach eventual consistency using Merkle trees. If
> >>the
> >> number of partitions is equal, then offsets have the same meaning for
> >>the
> >> same partition in both clusters, since the data for both partitions is
> >> identical before the offset. This allows a simple consumer to just
> >>inquire
> >> the local broker and the remote broker for their current offsets and, in
> >> case the remote broker is ahead, copy the extra data to the local
> >>cluster.
> >> Since the consumer offsets are no longer bound to the specific
> >>partitioning
> >> of a single remote cluster, the consumer could pull from one of any
> >>number
> >> of remote clusters, BitTorrent-style, if their offsets are ahead of the
> >> local offset. The group management problem would reduce to assigning
> >>local
> >> partitions to different MirrorMaker processes, so the group management
> >> could be done locally also in this situation.
> >>
> >> Regards,
> >> Vlad
> >>
>
>

Reply via email to