Great catch, Reza. Dataflow uses a customized C++ based Pubsub source that calculates the watermark differently. So the Java-based PubsubIO may have different behavior with respect the watermark (expected) or may simply have bugs (not desirable).
How are you feeding test data into Pubsub? Are you using TestPubsub [1] or your own wrapper? Kenn [1] https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java On Sat, Mar 7, 2020 at 1:46 AM Reza Rokni <[email protected]> wrote: > Hi, > > There are some known issues with the DirectRunner PubSubIO, which are not > present with Dataflow Runner. One of them is around watermarks: > > https://issues.apache.org/jira/browse/BEAM-7322 > > Not sure if this is part of the issue here, but worth exploring.. > > When testing are you sending a small volume of information and then > stopping or are you sending continuous output? > > Cheers > > Reza > > On Sat, Mar 7, 2020 at 1:29 PM Kenneth Knowles <[email protected]> wrote: > >> Can you reproduce it if you replace your Pubsub source with a TestStream >> and verify with PAssert [1]? This would enable you to easily build a unit >> test. You could even open a pull request adding that to the test suite for >> GroupIntoBatches [2]. That would be an excellent contribution to Beam. >> >> Kenn >> >> [1] https://beam.apache.org/blog/2016/10/20/test-stream.html >> [2] >> https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupIntoBatchesTest.java >> >> On Mon, Mar 2, 2020 at 9:25 AM Vasu Gupta <[email protected]> >> wrote: >> >>> Input : a-1, Timestamp : 1582994620366 >>> Input : c-2, Timestamp : 1582994620367 >>> Input : e-3, Timestamp : 1582994620367 >>> Input : d-4, Timestamp : 1582994620367 >>> Input : e-5, Timestamp : 1582994620367 >>> Input : b-6, Timestamp : 1582994620368 >>> Input : a-7, Timestamp : 1582994620368 >>> >>> Output : Timestamp : 1582994620367, Key : e-3,5 >>> Output : Timestamp : 1582994620368, Key : a-1,7 >>> >>> As you can see c-2 and d-4 are missing and I never received these >>> packets. >>> >>> On 2020/02/28 18:15:03, Kenneth Knowles <[email protected]> wrote: >>> > What are the timestamps on the elements? >>> > >>> > On Fri, Feb 28, 2020 at 8:36 AM Vasu Gupta <[email protected]> >>> wrote: >>> > >>> > > Edit: Issue is on Direct Runner(Not Direction Runner - mistyped) >>> > > Issue Details: >>> > > Input data: 7 key-value Packets like: a-1, a-4, b-3, c-5, d-1, e-4, >>> e-5 >>> > > Batch Size: 5 >>> > > Expected output: a-1,4, b-3, c-5, d-1, e-4,5 >>> > > Getting Packets with irregular size like a-1, b-5, e-4,5 OR a-1,4, >>> c-5 etc >>> > > But i always got correct number of packets with BATCH_SIZE = 1 >>> > > >>> > > On 2020/02/27 20:40:16, Kenneth Knowles <[email protected]> wrote: >>> > > > Can you share some more details? What is the expected output and >>> what >>> > > > output are you seeing? >>> > > > >>> > > > On Thu, Feb 27, 2020 at 9:39 AM Vasu Gupta < >>> [email protected]> >>> > > wrote: >>> > > > >>> > > > > Hey folks, I am using Apache beam Framework in Java with >>> Direction >>> > > Runner >>> > > > > for local testing purposes. When using GroupIntoBatches with >>> batch >>> > > size 1 >>> > > > > it works perfectly fine i.e. the output of the transform is >>> consistent >>> > > and >>> > > > > as expected. But when using with batch size > 1 the output >>> Pcollection >>> > > has >>> > > > > less data than it should be. >>> > > > > >>> > > > > Pipeline flow: >>> > > > > 1. A Transform for reading from pubsub >>> > > > > 2. Transform for making a KV out of the data >>> > > > > 3. A Fixed Window transform of 1 second >>> > > > > 4. Applying GroupIntoBatches transform >>> > > > > 5. And last, Logging the resulting Iterables. >>> > > > > >>> > > > > Weird thing is that it batch_size > 1 works great when running on >>> > > > > DataflowRunner but not with DirectRunner. I think the issue >>> might be >>> > > with >>> > > > > Timer Expiry since GroupIntoBatches uses BagState internally. >>> > > > > >>> > > > > Any help will be much appreciated. >>> > > > > >>> > > > >>> > > >>> > >>> >>
