On Mon, Jul 23, 2018 at 2:25 PM John Rudolf Lewis <[email protected]> wrote:
> So I guess I can add a timestamp to the message attributes when i receive > it from SQS since there is no such built in property. > But what triggers finilizeCheckpoint to be called? So far in my testing, I > never see that method get called, and hence, my messages keep getting > redelivered. > Are you testing with direct runner? It should be called after first stage processes (i.e. the checkpoint mark is durably committed by the runner). Raghu. > > > On Thu, Jul 19, 2018 at 5:26 PM, Raghu Angadi <[email protected]> wrote: > >> A timestamp for a message is fundamental to an element in a PCollection. >> What do you mean by not knowing timestamp of a message? >> There is finalizeCheckpoint API[1] in UnboundedSource. Does that help? >> PubSub is also very similar, a message need to be acked with in a timeout, >> otherwise it will be redelivered to one of the consumer. Pubsub messages >> are acked inside finalize(). >> >> [1]: >> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L129 >> >> >> On Thu, Jul 19, 2018 at 3:28 PM John Rudolf Lewis <[email protected]> >> wrote: >> >>> hmm... made lots of progress on this today. But need help understanding >>> something.... >>> >>> UnboundedSource seems to assume that there is some guarantee of message >>> ordering, and that you can get the timestamp of the current message. Using >>> UnboundedSource.CheckpointMark to help advance the offset. Seems to work ok >>> for any source that supports those assumptions. But SQS does not work this >>> way. >>> >>> With a standard SQS queue, there is no guarantee of ordering and there >>> is no timestamp for a message. With SQS, one needs to call the delete api >>> using the receipt handle from the message to acknowledge receipt of a >>> message and prevent its redelivery after the visibility timeout has expired. >>> >>> I'm not sure how to adapt these two patterns and would welcome >>> suggestions. >>> >>> >>> >>> On Thu, Jul 19, 2018 at 7:40 AM, Jean-Baptiste Onofré <[email protected]> >>> wrote: >>> >>>> Thx John ! >>>> >>>> Regards >>>> JB >>>> >>>> On 19/07/2018 16:39, John Rudolf Lewis wrote: >>>> > Thank you. >>>> > >>>> > I've created a jira ticket to add SQS and have assigned it to >>>> > myself: https://issues.apache.org/jira/browse/BEAM-4828 >>>> > >>>> > Modified the documentation to show it as in-progress: >>>> > https://github.com/apache/beam/pull/5995 >>>> > >>>> > And will be starting my work >>>> > here: https://github.com/JohnRudolfLewis/beam/tree/Add-SqsIO >>>> > >>>> > On Thu, Jul 19, 2018 at 1:43 AM, Jean-Baptiste Onofré < >>>> [email protected] >>>> > <mailto:[email protected]>> wrote: >>>> > >>>> > Agree with Ismaël. >>>> > >>>> > I would be more than happy to help on this one (as I contributed >>>> on AMQP >>>> > and JMS IOs ;)). >>>> > >>>> > Regards >>>> > JB >>>> > >>>> > On 19/07/2018 10:39, Ismaël Mejía wrote: >>>> > > Thanks for your interest John, it would be a really nice >>>> contribution >>>> > > to add SQS support. >>>> > > >>>> > > Some context on the kinesis stuff: >>>> > > >>>> > > The reason why kinesis is still in a separate module is more >>>> related >>>> > > to a licensing problem. Kinesis uses some native libraries that >>>> are >>>> > > published under a not 100% apache compatible license and we are >>>> not >>>> > > allowed to shade and republish them but it seems there is a >>>> workaround >>>> > > now, for more details see >>>> > > https://issues.apache.org/jira/browse/BEAM-3549 >>>> > <https://issues.apache.org/jira/browse/BEAM-3549> >>>> > > In any case if to use SQS you only need the Apache licensed >>>> aws-sdk >>>> > > deps it is ok (and a good idea) if you put it in the >>>> > > amazon-web-services module. >>>> > > >>>> > > The kinesis connector is way more complex for multiple reasons, >>>> first, >>>> > > the raw version of the amazon client libraries is not so >>>> ‘friendly’ >>>> > > and the guys who created KinesisIO had to do some workarounds to >>>> > > provide accurate checkpointing/watermarks. So since SQS is a way >>>> > > simpler system you should probably be ok basing it in simpler >>>> sources >>>> > > like AMQP or JMS. >>>> > > >>>> > > If you feel like to, please create the JIRA and don’t hesitate >>>> to ask >>>> > > questions if you find issues or if you need some review. >>>> > > >>>> > > On Thu, Jul 19, 2018 at 12:55 AM Lukasz Cwik <[email protected] >>>> > <mailto:[email protected]>> wrote: >>>> > >> >>>> > >> >>>> > >> >>>> > >> On Wed, Jul 18, 2018 at 3:30 PM John Rudolf Lewis >>>> > <[email protected] <mailto:[email protected]>> wrote: >>>> > >>> >>>> > >>> I need an SQS source for my project that is using beam. A >>>> brief >>>> > search did not turn up any in-progress work in this area. Please >>>> > point me to the right repo if I missed it. >>>> > >> >>>> > >> >>>> > >> To my knowledge there is none and nobody has marked it in >>>> > progress on https://beam.apache.org/documentation/io/built-in/ >>>> > <https://beam.apache.org/documentation/io/built-in/>. It would be >>>> > good to create a JIRA issue on https://issues.apache.org/ and >>>> send a >>>> > PR to add SQS to the inprogress list referencing your JIRA. I >>>> added >>>> > you as a contributor in JIRA so you should be able to assign >>>> > yourself to any issues that you create. >>>> > >> >>>> > >>> >>>> > >>> Assuming there is no in-progress effort, I would like to >>>> > contribute an Amazon SQS source. I have a few questions before I >>>> begin. >>>> > >> >>>> > >> >>>> > >> Great, note that this is a good starting point for authoring an >>>> > IO transform: >>>> > https://beam.apache.org/documentation/io/authoring-overview/ >>>> > <https://beam.apache.org/documentation/io/authoring-overview/> >>>> > >> >>>> > >>> >>>> > >>> >>>> > >>> It seems that the current AWS code is split into two different >>>> > modules: sdk/java/io/amazon-web-services which contains the >>>> > S3FileSystem, AwsOptions, etc, and sdk/java/io/kinesis which >>>> > contains an unbounded source based on a kinesis topic. I'd like to >>>> > add this source to the amazon-web-services module since I'd like >>>> to >>>> > depend on AwsOptions. Does adding this source to the >>>> > amazon-web-services module make sense? >>>> > >> >>>> > >> >>>> > >> Putting it inside of amazon-web-services makes a lot of sense. >>>> > The Google connectors all live within the one package and there >>>> has >>>> > been discussion to consolidate all the AWS stuff under >>>> > amazon-web-services. >>>> > >> >>>> > >>> >>>> > >>> Also, the kinesis source looks a touch more complex than other >>>> > sources. Both the JMS and AMQP sources look like better examples >>>> to >>>> > follow. Which existing source would be the best to model this >>>> > contribution after? >>>> > >> >>>> > >> >>>> > >> Some of it has to do with how many ways a source can be read >>>> and >>>> > how complicated the watermark tracking but it would be best if the >>>> > IO authors comment on implementation details. >>>> > >> >>>> > >>> >>>> > >>> If anyone has put some thoughts into this, or better yet some >>>> > code, I'd appreciate hearing from you. >>>> > >>> >>>> > >>> Thanks! >>>> > >>> >>>> > >>>> > -- >>>> > Jean-Baptiste Onofré >>>> > [email protected] <mailto:[email protected]> >>>> > http://blog.nanthrax.net >>>> > Talend - http://www.talend.com >>>> > >>>> > >>>> >>>> -- >>>> Jean-Baptiste Onofré >>>> [email protected] >>>> http://blog.nanthrax.net >>>> Talend - http://www.talend.com >>>> >>> >>> >
