[
https://issues.apache.org/jira/browse/KAFKA-14173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17725356#comment-17725356
]
Matthias J. Sax commented on KAFKA-14173:
-----------------------------------------
Just discovering this ticket.
I guess you would need to use `TestInputTopic#advanceTime` for this case?
Closing the ticket as "no an issue", as the API is there. Feel free to follow
up.
> TopologyTestDriver does not use mock wall clock time when sending test records
> ------------------------------------------------------------------------------
>
> Key: KAFKA-14173
> URL: https://issues.apache.org/jira/browse/KAFKA-14173
> Project: Kafka
> Issue Type: Bug
> Components: streams-test-utils
> Affects Versions: 2.3.1
> Reporter: Guido Josquin
> Priority: Minor
>
> I am trying to test a stream-stream join with `TopologyTestDriver`. My goal
> is to confirm that my topology performs the following left join correctly.
> {code:java}
> bills
> .leftJoin(payments)(
> {
> case (billValue, null) => billValue
> case (billValue, paymentValue) => (billValue.toInt -
> paymentValue.toInt).toString
> },
> JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(100))
> )
> .to("debt")
> {code}
>
> In other words, if we see a `bill` and a `payment` within 100ms, the payment
> should be subtracted from the bill. If we do not see a payment, the debt is
> simply the bill.
> Here is the test code.
> {code:java}
> val simpleLeftJoinTopology = new SimpleLeftJoinTopology
> val driver = new TopologyTestDriver(simpleLeftJoinTopology.topology)
> val serde = Serdes.stringSerde
> val bills = driver.createInputTopic("bills", serde.serializer,
> serde.serializer)
> val payments = driver.createInputTopic("payments", serde.serializer,
> serde.serializer)
> val debt = driver.createOutputTopic("debt", serde.deserializer,
> serde.deserializer)
> bills.pipeInput("fred", "100")
> bills.pipeInput("george", "20")
> payments.pipeInput("fred", "95")
> // When in doubt, sleep twice
> driver.advanceWallClockTime(Duration.ofMillis(500))
> Thread.sleep(500)
> // Send a new record to cause the previous window to be closed
> payments.pipeInput("percy", "0")
> val keyValues = debt.readKeyValuesToList()
> keyValues should contain theSameElementsAs Seq(
> // This record is present
> new KeyValue[String, String]("fred", "5"),
> // This record is missing
> new KeyValue[String, String]("george", "20")
> )
> {code}
> Full code available at [https://github.com/Oduig/kstreams-left-join-example]
> Is seems that advancing the wall clock time, sleeping, or sending an extra
> record, never triggers the join condition when data only arrives on the left
> side. It is possible to circumvent this by passing an explicit event time
> with each test record. (See
> https://stackoverflow.com/questions/73443812/using-kafka-streams-topologytestdriver-how-to-test-left-join-between-two-strea/73540161#73540161)
>
> However, the behavior deviates from a real Kafka broker. With a real broker,
> if we do not send an event, it uses the wall clock time of the broker
> instead. The behavior under test should be the same:
> `driver.advanceWallClockTime` should provide the default time to be used for
> `TestTopic.pipeInput`, when no other time is specified.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)