[
https://issues.apache.org/jira/browse/KAFKA-8902?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
John Roesler resolved KAFKA-8902.
---------------------------------
Resolution: Fixed
I wrote a simple Streams application using the Processor API to update 10
stores with every record seen.
{code}
final int numStores = 10;
final Topology topology = new Topology();
topology.addSource("source", new StringDeserializer(), new
StringDeserializer(), "table-in");
topology.addProcessor(
"processor",
(ProcessorSupplier<String, String>) () -> new Processor<String,
String>() {
private final List<KeyValueStore<String, String>> stores = new
ArrayList<>(numStores);
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
for (int i = 0; i < numStores; i++) {
stores.add(
i,
(KeyValueStore<String, String>)
context.getStateStore("store" + i)
);
}
}
@Override
public void process(final String key, final String value) {
for (final KeyValueStore<String, String> store : stores) {
store.put(key, value);
}
}
@Override
public void close() {
stores.clear();
}
},
"source"
);
{code}
I tested this topology using both in-memory and on-disk stores, with caching
and logging enabled.
My benchmark consisted of running one KafkaStreams instance and measuring its
metrics, while simulating other nodes joining and leaving the cluster (by
constructing the simulated nodes to participate in the consumer group protocol
without actually doing any work). I tested three cluster rebalance scenarios:
* scale up: 100 partitions / 10 nodes = 10 tasks per node starting, run 4
minutes, add one node (each node loses one task), run 2 minutes, add one node
(each node loses another task), run two minutes, add two nodes (each node loses
one task), end the test at the 10 minute mark
* rolling bounce: 100 partitions / 10 nodes = 10 tasks per node starting, run 4
minutes, bounce each node in the cluster (waiting for it to join and all nodes
to return to RUNNING before proceeding), end the test at the 10 minute mark
* full bounce: 100 partitions / 10 nodes = 10 tasks per node starting, run 4
minutes, bounce each node in the cluster (without waiting, so they all leave
and join at once), end the test at the 10 minute mark
For input data, I randomly generated a dataset of 10,000 keys, and another with
100,000 keys, all with 1kB values. This data was pre-loaded into the broker,
with compaction and retention disabled (so that every test iteration would get
the same sequence of updates)
I ran all the benchmarks on AWS i3.large instances, with a dedicated broker
node running on a separate i3.large instance.
For each test configuration and scenario, I ran 20 independent trials and
discarded the high and low results (to exclude outliers), for 18 total data
points. The key metric was the overall throughput of a single node during the
test.
I compared the above results from:
* 2.3.1-SNAPSHOT (the current head of the 2.3 branch) - Eager protocol
* 2.4.0-SNAPSHOT (the current head of the 2.4 branch) - Cooperative protocol
* a modified 2.4.0-SNAPSHOT with cooperative rebalancing disabled - Eager
protocol
What I found is that under all scenarios, all three versions performed the same
(within a 99.9% significance threshold) under the same data sets and the same
configurations.
I didn't see any marked improvement as a result of cooperative rebalancing
alone, but this is only the foundation for several follow-on improvements. What
is very good to know is that I also didn't find any regression as a result of
the new protocol implementation.
> Benchmark cooperative vs eager rebalancing
> ------------------------------------------
>
> Key: KAFKA-8902
> URL: https://issues.apache.org/jira/browse/KAFKA-8902
> Project: Kafka
> Issue Type: Sub-task
> Components: streams
> Reporter: Sophie Blee-Goldman
> Assignee: John Roesler
> Priority: Major
> Fix For: 2.4.0
>
>
> Cause rebalance and measure:
> * overall throughput
> * paused time
> * (also look at the metrics from
> (https://issues.apache.org/jira/browse/KAFKA-8609)):
> ** accumulated rebalance time
> Cluster/topic sizing:
> ** 10 instances
> ** 100 tasks (each instance gets 10 tasks)
> ** 1000 stores (each task gets 10 stores)
> * standbys = [0 and 1]
> Rolling bounce:
> * with and without state loss
> * shorter and faster than session timeout (shorter in particular should be
> interesting)
> Expand (from 9 to 10)
> Contract (from 10 to 9)
> With and without saturation:
> EOS:
> * with and without
> Topology:
> * stateful
> * windowed agg
> Key Parameterizations:
> 1. control: no rebalances
> 2. rolling without state loss faster than session timeout
> 3. expand 9 to 10
> 4. contract 10 to 9
--
This message was sent by Atlassian Jira
(v8.3.4#803005)