
I want to give a first respond:

1. Producer per task:

First, we did some performance tests, indicating that the performance
penalty is small. Please have a look here:

For the test, we ran with a trunk version and a modified version that
uses a producer per task (of course, no transactions, but at-least-once
semantics). The scaling factor indicates the number of brokers and
(single threaded) Streams instances. We used SimpleBenchmark that is
part of AK code base.

Second, as the design is "producer per task" (and not "producer per
partition") it is possible to specify a custom PartitionGrouper that
assigns multiple partitions to a single task. Thus, it allows to reduce
the number of tasks for scenarios with many partitions. Right now, this
interface must be implemented solely by the user, but we could also add
a new config parameter that specifies the max.number.of.tasks or
partitions.per.task so that the user can configure this instead of
implementing the interface.

Third, there is the idea of a "Producer Pool" that would allow to share
resources (network connections, memory, etc) over multiple producers.
This would allow to separate multiple transaction on the producer level,
while resources are shared. There is no detailed design document yet and
there would be a KIP for this feature.

Thus, if there should be any performance problems for high scale
scenarios, there are multiple ways to tackle them while keeping the
"producer per task" design.

Additionally, a "producer per thread" design would be way more complex
and I summarized the issues in a separate document. I will share a link
to the document soon.

2. StateStore recovery:

Streams EoS will in the first design not allow to exploit the
improvements that are added for 0.11 at the moment. However, as 0.10.2
faces the same issues of potentially long recovery, there is no
regression with this regard. Thus, I see those improvements as
orthogonal or add-ons. Nevertheless, we should try to explore those
options and if possible get them into 0.11 such that Streams with EoS
gets the same improvements as at-least-once scenario.

3. Caching:

We might need to do some experiments to quantify the impact on caching.
If it's severe, the suggested default commit interval of 100ms could
also be increased. Also, EoS will not enforce any commit interval, but
only change the default value. Thus, a user can freely trade-off latency
vs. caching-effect.

Last but not least, there is the idea to allow "read_uncommitted" for
intermediate topic. This would be an advance design for Streams EoS that
allows downstream sub-topologies to read uncommitted data
optimistically. In case of failure, a cascading abort of transactions
would be required. This change will need another KIP.

4. Idempotent Producer:

The transactional part automatically leverages the idempotent properties
of the producer. Idempotency is a requirement:

> Note that enable.idempotence must be enabled if a TransactionalId is 
> configured.


All idempotent retries, are handled by the producer internally (with or
without transaction) if enable.idempotence is set to true.


On 3/3/17 3:34 AM, Eno Thereska wrote:
> Another question: 
> The KIP doesn’t exactly spell out how it uses the idempotence guarantee from 
> KIP-98. It seems that only the transactional part is needed. Or is the 
> idempotence guarantee working behind the scenes and helping for some 
> scenarios for which it is not worthwhile aborting a transaction (e.g., 
> retransmitting a record after a temporary network glitch)?
> Thanks
> Eno
>> On Mar 2, 2017, at 4:56 PM, Jay Kreps <j...@confluent.io> wrote:
>> I second the concern on with the one producer per task approach. At a
>> high-level it seems to make sense but I think Damian is exactly right that
>> that cuts against the general design of the producer. Many people have high
>> input partition counts and will have high task counts as a result. I think
>> processing 1000 partitions should not be an unreasonable thing to want to
>> do.
>> The tricky bits will be:
>>   - Reduced effectiveness of batching (or more latency and memory to get
>>   equivalent batching). This doesn't show up in simple benchmarks because
>>   much of the penalty is I/O and CPU on the broker and the additional threads
>>   from all the producers can make a single-threaded benchmark seem faster.
>>   - TCP connection explosion. We maintain one connection per broker. This
>>   is already high since each app instance does this. This design though will
>>   add an additional multiplicative factor based on the partition count of the
>>   input.
>>   - Connection and metadata request storms. When an instance with 1000
>>   tasks starts up it is going to try to create many thousands of connections
>>   and issue a thousand metadata requests all at once.
>>   - Memory usage. We currently default to 64MB per producer. This can be
>>   tuned down, but the fact that we are spreading the batching over more
>>   producers will fundamentally mean we need a lot more memory to get good
>>   perf and the memory usage will change as your task assignment changes so it
>>   will be hard to set correctly unless it is done automatically.
>>   - Metrics explosion (1000 producer instances, each with their own
>>   metrics to monitor).
>>   - Thread explosion, 1000 background threads, one per producer, each
>>   sending data.
>> -Jay
>> On Wed, Mar 1, 2017 at 3:05 AM, Damian Guy <damian....@gmail.com> wrote:
>>> Hi Guozhang,
>>> Thanks for the KIP! This is an important feature for Kafka Streams and will
>>> help to unlock a bunch of use cases.
>>> I have some concerns/questions:
>>>   1. Producer per task: I'm worried about the overhead this is going to
>>>   put on both the streams app and the Kafka Brokers. You can easily
>>> imagine
>>>   an app consuming thousands of partitions. What load will this put on the
>>>   brokers? Am i correct in assuming that there will be metadata requests
>>> per
>>>   Producer? The memory overhead in the streams app will also increase
>>> fairly
>>>   significantly. Should we adjust ProducerConfig.BUFFER_MEMORY_CONFIG?
>>>   2. State Store recovery: As we already know, restoring the entire
>>>   changelog can take an extremely long time. Even with a fairly small
>>> dataset
>>>   and an inappropriately tuned segment size, this can take way too long.
>>> My
>>>   concern is that failures happen and then recovery takes "forever" and we
>>>   end up in a situation where we need to change the max.poll.interval to
>>> be
>>>   some very large number or else we end up in "rebalance hell". I don't
>>> think
>>>   this provides a very good user experience. You mention RocksDB
>>>   checkpointing in the doc - should we explore this idea some more? i.e.,
>>>   understand the penalty for checkpointing. Maybe checkpoint every *n*
>>>    commits?
>>>   3. What does EoS mean for Caching? If we set the commit interval to
>>>   100ms then the cache is not going to be very effective. Should it just
>>> be
>>>   disabled?
>>> Thanks,
>>> Damian
>>> On Tue, 28 Feb 2017 at 21:54 Guozhang Wang <wangg...@gmail.com> wrote:
>>>> Hi all,
>>>> I have just created KIP-129 to leverage KIP-98 in Kafka Streams and
>>> provide
>>>> exactly-once processing semantics:
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>> 129%3A+Streams+Exactly-Once+Semantics
>>>> This KIP enables Streams users to optionally turn on exactly-once
>>>> processing semantics without changing their app code at all by leveraging
>>>> the transactional messaging features provided in KIP-98.
>>>> The above wiki page provides a high-level view of the proposed changes,
>>>> while detailed implementation design can be found in this Google doc:
>>>> https://docs.google.com/document/d/1pGZ8xtOOyGwDYgH5vA6h19zOMMadu
>>> FK1DAB8_gBYA2c
>>>> We would love to hear your comments and suggestions.
>>>> Thanks,
>>>> -- Guozhang

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to