Hey Brian,

So that’s an approach that was thinking about last night and I’m sure it could 
work (I.e. running the pipeline and shortly after sending through messages). 
I’m assuming that as long as the pipeline is running, I should be able to apply 
a PAssert against it to verify that something happened after injecting the data 
and passing/failing the test.

Another question that seems to be baffling me is - is it possible to simulate 
processing time advances using Kafka in a scenario like this? I know that I can 
use a TestStream with its “advanceProcessingTime()” constructs to simulate this 
in a stream, but doing so in a test scenario would circumvent Kafka entirely 
which seems to defeat the purpose of the test.

Ideally what I’d want to accomplish is:
- define a series of records with appropriate timestamps to send through my 
pipeline
- window against those, verifying allowed lateness is applied/messages dropped 
accordingly
- assert against the output of the windows

Thanks so much for the response, I do appreciate it.

> On Feb 9, 2021, at 11:08 AM, Brian Hulette <bhule...@google.com> wrote:
> 
> 
> Hi Rion,
> 
> Can you run the pipeline asynchronously and inject messages after it has 
> started? We use this approach for some tests against Cloud PubSub. 
> Note if using the DirectRunner you need to set the blockOnRun pipeline option 
> to False to do this.
> 
> Brian
> 
>> On Mon, Feb 8, 2021 at 2:10 PM Rion Williams <rionmons...@gmail.com> wrote:
>> Hey folks,
>> 
>> I’ve been working on fleshing out a proof-of-concept pipeline that deals 
>> with some out of order data (I.e. mismatching processing times / 
>> event-times) and does quite a bit of windowing depending on the data. Most 
>> of the work I‘ve done in a lot of streaming systems relies heavily on 
>> well-written tests, both unit and integration. Beam’s existing constructs 
>> like TestStreams are incredible, however I’ve been trying to more closely 
>> mimic a production pipeline. 
>> 
>> I’ve been using kafka-junit [1] and it’s worked incredibly well and can be 
>> consumed by the KafkaIO source as expected. This works great, however it 
>> requires that the data is injected into the topic prior to starting the 
>> pipeline:
>> 
>> ```
>> injectIntoTopic(...)
>> 
>> pipeline
>>     .apply(
>>         KafkaIO.read(...)
>>     )
>> 
>> pipeline.run()
>> ```
>> 
>> This results in all of the data being consumed at once and thus not treating 
>> allowed lateness like I would imagine. So my question is this:
>> 
>> Is it possible to create a unit test to send records to a Kafka topic with 
>> some given (even artificial) delay so that I could verify things like 
>> allowed lateness within a pipeline?
>> 
>> Essentially like using a TestStream with all of its notions of 
>> “advanceProcessingTime” in conjunction with inserting those delayed records 
>> into Kafka.
>> 
>> Does that make sense? Or should I just rely on the use of a TestStream and 
>> circumvent Kafka entirely? It doesn’t seem like the right thing to do, but I 
>> don’t currently see a way to work around this (I’ve tried defining 
>> timestamps at the ProducerRecord level, adding sleeps between records, etc.)
>> 
>> Any advice would be appreciated!
>> 
>> [1] : https://github.com/salesforce/kafka-junit

Reply via email to