Hi all, LB1 / LB2 / LB3 follow-up: Yes you are right, in multi-partition mode, calling on of these getStateStore(), getKeyValueStore(), getSessionStore(), and getWindowStore() without specifying the partition should throw an exception (IllegalStateException) I will update the KIP.
LB4: I think we can indeed implement a round robin mode for null keys to more catch up with the default partitionner implementation I will update the KIP. LB5: We can make TestRecord.equals() and hashCode() do not include the partition field. Partition equality must be asserted explicitly via getPartition() when needed. This preserves full backward compatibility: existing assertEquals assertions on TestRecord remain valid in multi-partition mode. What do you think ? cheers Le ven. 29 mai 2026 à 15:16, Lucas Brutschy via dev <[email protected]> a écrit : > Hi all, > > Thanks Sébastien for the follow-ups. From my side this is ready for a > vote, although I do have some more comments below. These are all nits > and non-blocking concerns, so I am happy to +1 the KIP even if we > haven't converged on a solution for the items below. > > LB1 / LB2 / LB3 follow-up: Just to confirm one detail: in > multi-partition mode, the no-arg getStateStore/getKeyValueStore/etc. > throws, right? That seems like the only sane choice but it would be > worth stating in the KIP. Except for global stores - where I need to > use the non-arg version to access partition 0, assuming you don't have > to declare partitions for global stores; I'm not sure this is explicit > enough in the KIP. > > LB4 follow-up: Routing all null-key records to partition 0 keeps > things deterministic, but it diverges from Kafka's DefaultPartitioner, > which spreads null keys across partitions via uniform-sticky. The > practical effect is that null-key tests in multi-partition mode will > only ever exercise one task, which seems like it could mask the very > fan-out the KIP is meant to enable. Wouldn't round-robin be more > realistic? > > LB6: Assuming the partition in TestRecord is part of its equals, I can > no longer write assertEquals(new TestRecord<>("k", "v"), > output.readRecord()), because the partitions will not match. I'm not > sure if I have a solution. > > Thanks, > Lucas > > On Wed, May 27, 2026 at 5:35 PM Sebastien Viale > <[email protected]> wrote: > > > > Hi all, > > > > Thanks for your remarks. > > > > AS: > > On the builder pattern, we agree it's a cleaner approach than both the > state-machine and the two-class strategies. The separation between setup > and execution phases would be made explicit at the API level rather than > just at the implementation level. > > Concretely: > > > > // Multi-partition mode (new) > > TopologyTestDriver driver = new TopologyTestDriver(topology, config) > > .withMultiPartitionMode() > > .declareTopic("input", 3) > > .declareTopic("output", 3) > > .build(); > > > > // Single-partition mode (unchanged) > > TopologyTestDriver driver = new TopologyTestDriver(topology, config); > > > > // Same topic declaration > > TestInputTopic<String, String> input = driver.createInputTopic( "input", > Serdes.String().serializer(),Serdes.String().serializer()); > > TestOutputTopic<String, String> output = driver.createOutputTopic( > "output",Serdes.String().deserializer(), Serdes.String().deserializer()); > > > > createInputTopic, createOutputTopic and pipeInput remain identical in > both modes > > So callers that never invoke withMultiPartitionMode() will observe > strictly identical behaviour to the current release — no new code path, no > risk of regression. > > > > LB4: > > Records with `null` key will be redirected to partition 0 if no > explicit partition is set. > > > > private int resolvePartition(final String topic, final byte[] keyBytes, > final Integer explicit) { > > final int n = Math.max(1, > declaredPartitionsByTopic.getOrDefault(topic, 1)); > > if (explicit != null) { > > if (explicit < 0 || explicit >= n) { > > throw new IllegalArgumentException( > > "Partition " + explicit + " is out of range for topic '" > + topic > > + "' (has " + n + " partitions). Declare a higher > count via declareTopic() if needed."); > > } > > return explicit; > > } > > if (keyBytes == null || n == 1) { > > return 0; > > } > > return Utils.toPositive(Utils.murmur2(keyBytes)) % n; > > } > > > > > > LB5: > > When a single pipeInput causes records to fan-out across multiple > downstream tasks, the test driver select the task with the lowest current > stream time. > > When multiple tasks share the same stream-time (which is the common case > immediately after a fan-out,), the tasks are stored in a TreeMap<TaskId, > StreamTask>. As you suggest, the ordering will be deterministic and stable > (e.g. ascending `(subtopologyId, partition)` > > > > private final TreeMap<TaskId, StreamTask> multiSubTasks = new > java.util.TreeMap<>(); > > > > private StreamTask pickNextProcessableTask() { > > StreamTask task = null; > > long bestTime = Long.MAX_VALUE; > > final long now = mockWallClockTime.milliseconds(); > > for (final StreamTask t : multiSubTasks.values()) { > > if (!t.hasRecordsQueued() || !t.isProcessable(now)) { > > continue; > > } > > final long streamTime = (t.processorContext()).currentStreamTimeMs(); > > if (streamTime < bestTime) { > > bestTime = streamTime; > > task = t; > > } > > } > > return task; > > } > > > > > > We will update the KIP accordingly to all the comments > > > > Cheers > > > > > > De : Lucas Brutschy via dev <[email protected]> > > Date : jeudi, 21 mai 2026 à 14:25 > > À : [email protected] <[email protected]> > > Cc : Alieh Saeedi <[email protected]>; Lucas Brutschy < > [email protected]> > > Objet : [EXT] Re: [DISCUSS] KIP-1238: Multipartition for > TopologyTestDriver in Kafka Streams > > > > Warning External sender Do not click on any links or open any > attachments unless you trust the sender and know the content is safe. > > > > Hi all, > > > > Thanks for the updates. Sorry for the late response, somehow the > > latest response was not delivered to my inbox and I had to fetch it > > form the mailing list archives. A few thoughts on the recent points, > > and a couple of follow-ups. > > > > AS: The builder pattern is a good direction — it avoids forcing the > > user to reason about a mode/state machine, and it lets the existing > > single-partition flow stay untouched. The Compatibility section should > > state explicitly that existing constructors plus > > `createInputTopic/createOutputTopic/pipeInput/getStateStore` remain > > source- and binary-compatible for callers that never touch > > `withMultiPartitionMode()`, so we have a clear contract for existing > > tests. > > > > LB4: How are records with a `null` key routed in multi-partition mode? > > The real producer's `DefaultPartitioner` uses sticky partitioning for > > null keys, which would make tests non-deterministic. The KIP should > > specify the behavior — for example, round-robin in declared partition > > order, or requiring an explicit partition on `TestRecord` when the key > > is null. > > > > LB5: When a single `pipeInput` causes records to fan out across > > multiple downstream tasks, what is the processing order? A documented > > deterministic order (e.g. ascending `(subtopologyId, partition)`) > > would be valuable to state in the KIP. > > > > Thanks, > > Lucas > > > > > ======================================================================================== > > This email was screened for spam and malicious content but exercise > caution anyway. > > > > > > > > >
