[
https://issues.apache.org/jira/browse/KAFKA-9503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
John Roesler resolved KAFKA-9503.
---------------------------------
Resolution: Fixed
> TopologyTestDriver processes intermediate results in the wrong order
> --------------------------------------------------------------------
>
> Key: KAFKA-9503
> URL: https://issues.apache.org/jira/browse/KAFKA-9503
> Project: Kafka
> Issue Type: Bug
> Components: streams-test-utils
> Reporter: John Roesler
> Assignee: John Roesler
> Priority: Major
>
> TopologyTestDriver has the feature that it processes each input
> synchronously, resolving one of the most significant challenges with
> verifying the correctness of streaming applications.
> When processing an input, it feeds that record to the source node, which then
> synchronously (it's always synchronous within a task) gets passed through the
> subtopology via Context#forward calls. Ultimately, outputs from that input
> are forwarded into the RecordCollector, which converts it to Producer.send
> calls. In TopologyTestDriver, this Producer is a special one that actually
> just captures the records.
> Some output topics from one subtopology are inputs to another subtopology.
> For example, repartition topics. Immediately after the synchronous
> subtopology process() invocation, TopologyTestDriver iterates over the
> collected outputs from the special Producer. If they are purely output
> records, it just enqueues them for later retrieval by testing code. If they
> are records for internal topics, though, TopologyTestDriver immediately
> processes them as inputs for the relevant subtopology.
> The problem, and this is very subtle, is that TopologyTestDriver does this
> recursively, which with some (apparently rare) programs can cause the output
> to be observed in an invalid order.
> One such program is the one I wrote to test the fix for KAFKA-9487 . It
> involves a foreign-key join whose result is joined back to one of its inputs.
> {noformat}
> Here's a simplified version:
> // foreign key join
> J = A.join(B, (extractor) a -> a.b, (joiner) (a,b) -> new Pair(a, b))
> // equi-join
> OUT = A.join(J, (joiner) (a, j) -> new Pair(a, j))
> Let's say we have the following initial condition:
> A:
> a1 = {v: X, b: b1}
> B:
> b1 = {v: Y}
> J:
> a1 = Pair({v: X}, b: b1}, {v: Y})
> OUT:
> a1 = Pair({v: X}, b: b1}, Pair({v: X}, b: b1}, {v: Y}))
> Now, piping an update:
> a1: {v: Z, b: b1}
> results immediately in two buffered results in the Producer:
> (FK join subscription): b1: {a1}
> (OUT): a1 = Pair({v: Z}, b: b1}, Pair({v: X}, b: b1}, {v: Y}))
> Note that the FK join result isn't updated synchronously, since it's an async
> operation, so the RHS lookup is temporarily incorrect, yielding the nonsense
> intermediate result where the outer pair has the updated value for a1, but
> the inner (fk result) one still has the old value for a1.
> However! We don't buffer that output record for consumption by testing code
> yet, we leave it in the internal Producer while we process the first
> intermediate record (the FK subscription).
> Processing that internal record means that we have a new internal record to
> process:
> (FK join subscription response): a1: {b1: {v: Y}}
> so right now, our internal-records-to-process stack looks like:
> (FK join subscription response): a1: {b1: {v: Y}}
> (OUT) a1: Pair({v: Z}, b: b1}, Pair({v: X}, b: b1}, {v: Y}))
> Again, we start by processing the first thing, the FK join response, which
> results in an updated FK join result:
> (J) a1: Pair({v: Z}, b: b1}, {v: Y})
> and output:
> (OUT) a1: Pair({v: Z}, b: b1}, Pair({v: Z}, b: b1}, {v: Y}))
> and, we still haven't handled the earlier output, so now our
> internal-records-to-process stack looks like:
> (J) a1: Pair({v: Z}, b: b1}, {v: Y})
> (OUT) a1: Pair({v: Z}, b: b1}, Pair({v: Z}, b: b1}, {v: Y}))
> (OUT) a1: Pair({v: Z}, b: b1}, Pair({v: X}, b: b1}, {v: Y}))
> At this point, there's nothing else to process in internal topics, so we just
> copy the records one by one to the "output" collection for later handling by
> testing code, but this yields the wrong final state of:
> (OUT) a1: Pair({v: Z}, b: b1}, Pair({v: X}, b: b1}, {v: Y}))
> That was an incorrect intermediate result, but because we're processing
> internal records recursively (as a stack), it winds up emitted at the end
> instead of in the middle.
> If we change the processing model from a stack to a queue, the correct order
> is preserved, and the final state is:
> (OUT) a1: Pair({v: Z}, b: b1}, Pair({v: Z}, b: b1}, {v: Y}))
> {noformat}
> This is what I did in https://github.com/apache/kafka/pull/8015
--
This message was sent by Atlassian Jira
(v8.3.4#803005)