Can you reproduce it if you replace your Pubsub source with a TestStream
and verify with PAssert [1]? This would enable you to easily build a unit
test. You could even open a pull request adding that to the test suite for
GroupIntoBatches [2]. That would be an excellent contribution to Beam.

Kenn

[1] https://beam.apache.org/blog/2016/10/20/test-stream.html
[2]
https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupIntoBatchesTest.java

On Mon, Mar 2, 2020 at 9:25 AM Vasu Gupta <[email protected]> wrote:

> Input : a-1, Timestamp : 1582994620366
> Input : c-2, Timestamp : 1582994620367
> Input : e-3, Timestamp : 1582994620367
> Input : d-4, Timestamp : 1582994620367
> Input : e-5, Timestamp : 1582994620367
> Input : b-6, Timestamp : 1582994620368
> Input : a-7, Timestamp : 1582994620368
>
> Output : Timestamp : 1582994620367, Key : e-3,5
> Output : Timestamp : 1582994620368, Key : a-1,7
>
> As you can see c-2 and d-4 are missing and I never received these packets.
>
> On 2020/02/28 18:15:03, Kenneth Knowles <[email protected]> wrote:
> > What are the timestamps on the elements?
> >
> > On Fri, Feb 28, 2020 at 8:36 AM Vasu Gupta <[email protected]>
> wrote:
> >
> > > Edit: Issue is on Direct Runner(Not Direction Runner - mistyped)
> > > Issue Details:
> > > Input data: 7 key-value Packets like: a-1, a-4, b-3, c-5, d-1, e-4, e-5
> > > Batch Size: 5
> > > Expected output: a-1,4, b-3, c-5, d-1, e-4,5
> > > Getting Packets with irregular size like a-1, b-5, e-4,5 OR a-1,4, c-5
> etc
> > > But i always got correct number of packets with BATCH_SIZE = 1
> > >
> > > On 2020/02/27 20:40:16, Kenneth Knowles <[email protected]> wrote:
> > > > Can you share some more details? What is the expected output and what
> > > > output are you seeing?
> > > >
> > > > On Thu, Feb 27, 2020 at 9:39 AM Vasu Gupta <[email protected]>
> > > wrote:
> > > >
> > > > > Hey folks, I am using Apache beam Framework in Java with Direction
> > > Runner
> > > > > for local testing purposes. When using GroupIntoBatches with batch
> > > size 1
> > > > > it works perfectly fine i.e. the output of the transform is
> consistent
> > > and
> > > > > as expected. But when using with batch size > 1 the output
> Pcollection
> > > has
> > > > > less data than it should be.
> > > > >
> > > > > Pipeline flow:
> > > > > 1. A Transform for reading from pubsub
> > > > > 2. Transform for making a KV out of the data
> > > > > 3. A Fixed Window transform of 1 second
> > > > > 4. Applying GroupIntoBatches transform
> > > > > 5. And last, Logging the resulting Iterables.
> > > > >
> > > > > Weird thing is that it batch_size > 1 works great when running on
> > > > > DataflowRunner but not with DirectRunner. I think the issue might
> be
> > > with
> > > > > Timer Expiry since GroupIntoBatches uses BagState internally.
> > > > >
> > > > > Any help will be much appreciated.
> > > > >
> > > >
> > >
> >
>

Reply via email to