Hi Rion,

Can you run the pipeline asynchronously and inject messages after it has
started? We use this approach for some tests against Cloud PubSub.
Note if using the DirectRunner you need to set the blockOnRun pipeline
option to False to do this.

Brian

On Mon, Feb 8, 2021 at 2:10 PM Rion Williams <[email protected]> wrote:

> Hey folks,
>
> I’ve been working on fleshing out a proof-of-concept pipeline that deals
> with some out of order data (I.e. mismatching processing times /
> event-times) and does quite a bit of windowing depending on the data. Most
> of the work I‘ve done in a lot of streaming systems relies heavily on
> well-written tests, both unit and integration. Beam’s existing constructs
> like TestStreams are incredible, however I’ve been trying to more closely
> mimic a production pipeline.
>
> I’ve been using kafka-junit [1] and it’s worked incredibly well and can be
> consumed by the KafkaIO source as expected. This works great, however it
> requires that the data is injected into the topic prior to starting the
> pipeline:
>
> ```
> injectIntoTopic(...)
>
> pipeline
>     .apply(
>         KafkaIO.read(...)
>     )
>
> pipeline.run()
> ```
>
> This results in all of the data being consumed at once and thus not
> treating allowed lateness like I would imagine. So my question is this:
>
> Is it possible to create a unit test to send records to a Kafka topic with
> some given (even artificial) delay so that I could verify things like
> allowed lateness within a pipeline?
>
> Essentially like using a TestStream with all of its notions of
> “advanceProcessingTime” in conjunction with inserting those delayed records
> into Kafka.
>
> Does that make sense? Or should I just rely on the use of a TestStream and
> circumvent Kafka entirely? It doesn’t seem like the right thing to do, but
> I don’t currently see a way to work around this (I’ve tried defining
> timestamps at the ProducerRecord level, adding sleeps between records, etc.)
>
> Any advice would be appreciated!
>
> [1] : https://github.com/salesforce/kafka-junit
>

Reply via email to