Hi all,
Thanks for your remarks.
AS:
On the builder pattern, we agree it's a cleaner approach than both the
state-machine and the two-class strategies. The separation between setup and
execution phases would be made explicit at the API level rather than just at
the implementation level.
Concretely:
// Multi-partition mode (new)
TopologyTestDriver driver = new TopologyTestDriver(topology, config)
.withMultiPartitionMode()
.declareTopic("input", 3)
.declareTopic("output", 3)
.build();
// Single-partition mode (unchanged)
TopologyTestDriver driver = new TopologyTestDriver(topology, config);
// Same topic declaration
TestInputTopic<String, String> input = driver.createInputTopic( "input",
Serdes.String().serializer(),Serdes.String().serializer());
TestOutputTopic<String, String> output = driver.createOutputTopic(
"output",Serdes.String().deserializer(), Serdes.String().deserializer());
createInputTopic, createOutputTopic and pipeInput remain identical in both modes
So callers that never invoke withMultiPartitionMode() will observe strictly
identical behaviour to the current release — no new code path, no risk of
regression.
LB4:
Records with `null` key will be redirected to partition 0 if no explicit
partition is set.
private int resolvePartition(final String topic, final byte[] keyBytes, final
Integer explicit) {
final int n = Math.max(1, declaredPartitionsByTopic.getOrDefault(topic, 1));
if (explicit != null) {
if (explicit < 0 || explicit >= n) {
throw new IllegalArgumentException(
"Partition " + explicit + " is out of range for topic '" + topic
+ "' (has " + n + " partitions). Declare a higher count via
declareTopic() if needed.");
}
return explicit;
}
if (keyBytes == null || n == 1) {
return 0;
}
return Utils.toPositive(Utils.murmur2(keyBytes)) % n;
}
LB5:
When a single pipeInput causes records to fan-out across multiple downstream
tasks, the test driver select the task with the lowest current stream time.
When multiple tasks share the same stream-time (which is the common case
immediately after a fan-out,), the tasks are stored in a TreeMap<TaskId,
StreamTask>. As you suggest, the ordering will be deterministic and stable
(e.g. ascending `(subtopologyId, partition)`
private final TreeMap<TaskId, StreamTask> multiSubTasks = new
java.util.TreeMap<>();
private StreamTask pickNextProcessableTask() {
StreamTask task = null;
long bestTime = Long.MAX_VALUE;
final long now = mockWallClockTime.milliseconds();
for (final StreamTask t : multiSubTasks.values()) {
if (!t.hasRecordsQueued() || !t.isProcessable(now)) {
continue;
}
final long streamTime = (t.processorContext()).currentStreamTimeMs();
if (streamTime < bestTime) {
bestTime = streamTime;
task = t;
}
}
return task;
}
We will update the KIP accordingly to all the comments
Cheers
De : Lucas Brutschy via dev <[email protected]>
Date : jeudi, 21 mai 2026 à 14:25
À : [email protected] <[email protected]>
Cc : Alieh Saeedi <[email protected]>; Lucas Brutschy
<[email protected]>
Objet : [EXT] Re: [DISCUSS] KIP-1238: Multipartition for TopologyTestDriver in
Kafka Streams
Warning External sender Do not click on any links or open any attachments
unless you trust the sender and know the content is safe.
Hi all,
Thanks for the updates. Sorry for the late response, somehow the
latest response was not delivered to my inbox and I had to fetch it
form the mailing list archives. A few thoughts on the recent points,
and a couple of follow-ups.
AS: The builder pattern is a good direction — it avoids forcing the
user to reason about a mode/state machine, and it lets the existing
single-partition flow stay untouched. The Compatibility section should
state explicitly that existing constructors plus
`createInputTopic/createOutputTopic/pipeInput/getStateStore` remain
source- and binary-compatible for callers that never touch
`withMultiPartitionMode()`, so we have a clear contract for existing
tests.
LB4: How are records with a `null` key routed in multi-partition mode?
The real producer's `DefaultPartitioner` uses sticky partitioning for
null keys, which would make tests non-deterministic. The KIP should
specify the behavior — for example, round-robin in declared partition
order, or requiring an explicit partition on `TestRecord` when the key
is null.
LB5: When a single `pipeInput` causes records to fan out across
multiple downstream tasks, what is the processing order? A documented
deterministic order (e.g. ascending `(subtopologyId, partition)`)
would be valuable to state in the KIP.
Thanks,
Lucas
========================================================================================
This email was screened for spam and malicious content but exercise caution
anyway.