[ 
https://issues.apache.org/jira/browse/KAFKA-8902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-8902.
---------------------------------
    Resolution: Fixed

I wrote a simple Streams application using the Processor API to update 10 
stores with every record seen.

{code}
        final int numStores = 10;
        final Topology topology = new Topology();

        topology.addSource("source", new StringDeserializer(), new 
StringDeserializer(), "table-in");

        topology.addProcessor(
            "processor",
            (ProcessorSupplier<String, String>) () -> new Processor<String, 
String>() {
                private final List<KeyValueStore<String, String>> stores = new 
ArrayList<>(numStores);

                @SuppressWarnings("unchecked")
                @Override
                public void init(final ProcessorContext context) {
                    for (int i = 0; i < numStores; i++) {
                        stores.add(
                            i,
                            (KeyValueStore<String, String>) 
context.getStateStore("store" + i)
                        );
                    }
                }

                @Override
                public void process(final String key, final String value) {
                    for (final KeyValueStore<String, String> store : stores) {
                        store.put(key, value);
                    }
                }

                @Override
                public void close() {
                    stores.clear();
                }
            },
            "source"
        );
{code}

I tested this topology using both in-memory and on-disk stores, with caching 
and logging enabled.

My benchmark consisted of running one KafkaStreams instance and measuring its 
metrics, while simulating other nodes joining and leaving the cluster (by 
constructing the simulated nodes to participate in the consumer group protocol 
without actually doing any work). I tested three cluster rebalance scenarios:
* scale up: 100 partitions / 10 nodes = 10 tasks per node starting, run 4 
minutes, add one node (each node loses one task), run 2 minutes, add one node 
(each node loses another task), run two minutes, add two nodes (each node loses 
one task), end the test at the 10 minute mark
* rolling bounce: 100 partitions / 10 nodes = 10 tasks per node starting, run 4 
minutes, bounce each node in the cluster (waiting for it to join and all nodes 
to return to RUNNING before proceeding), end the test at the 10 minute mark
* full bounce: 100 partitions / 10 nodes = 10 tasks per node starting, run 4 
minutes, bounce each node in the cluster (without waiting, so they all leave 
and join at once), end the test at the 10 minute mark

For input data, I randomly generated a dataset of 10,000 keys, and another with 
100,000 keys, all with 1kB values. This data was pre-loaded into the broker, 
with compaction and retention disabled (so that every test iteration would get 
the same sequence of updates)

I ran all the benchmarks on AWS i3.large instances, with a dedicated broker 
node running on a separate i3.large instance.

For each test configuration and scenario, I ran 20 independent trials and 
discarded the high and low results (to exclude outliers), for 18 total data 
points. The key metric was the overall throughput of a single node during the 
test.

I compared the above results from:
* 2.3.1-SNAPSHOT (the current head of the 2.3 branch) - Eager protocol
* 2.4.0-SNAPSHOT (the current head of the 2.4 branch) - Cooperative protocol
* a modified 2.4.0-SNAPSHOT with cooperative rebalancing disabled - Eager 
protocol

What I found is that under all scenarios, all three versions performed the same 
(within a 99.9% significance threshold) under the same data sets and the same 
configurations.

I didn't see any marked improvement as a result of cooperative rebalancing 
alone, but this is only the foundation for several follow-on improvements. What 
is very good to know is that I also didn't find any regression as a result of 
the new protocol implementation.

> Benchmark cooperative vs eager rebalancing
> ------------------------------------------
>
>                 Key: KAFKA-8902
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8902
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: streams
>            Reporter: Sophie Blee-Goldman
>            Assignee: John Roesler
>            Priority: Major
>             Fix For: 2.4.0
>
>
> Cause rebalance and measure:
> * overall throughput
> * paused time
> * (also look at the metrics from 
> (https://issues.apache.org/jira/browse/KAFKA-8609)):
> ** accumulated rebalance time
> Cluster/topic sizing:
> ** 10 instances
> ** 100 tasks (each instance gets 10 tasks)
> ** 1000 stores (each task gets 10 stores)
> * standbys = [0 and 1]
> Rolling bounce:
> * with and without state loss
> * shorter and faster than session timeout (shorter in particular should be 
> interesting)
> Expand (from 9 to 10)
> Contract (from 10 to 9)
> With and without saturation:
> EOS:
> * with and without
> Topology:
> * stateful
> * windowed agg
> Key Parameterizations:
> 1. control: no rebalances
> 2. rolling without state loss faster than session timeout
> 3. expand 9 to 10
> 4. contract 10 to 9



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to