Some of the queue based technologies don't have an explicit timestamp (and
even if they do its not the timestamp the user typically wants as its
usually the queued time). Typically an option is exposed where the user
supplies the name of a property that should be interpreted within the SQS
message to supply the timestamp. PubsubIO provides an estimate by keeping
track of the minimum over the last minute of time [1]. You can start with
something simple and deal with improving the watermark tracking once you
get something working end to end by taking a look at the other unbounded
sources and see how they track watermarks effectively (a library of such
statistical methods would be useful for future IO authors as well).
SplittableDoFn will address the issue of needing a callback to delete state
after it is read but for now you can model a set of transforms like this:
Read(UnboundedSource) --message id-> Reshuffle -> ParDo(DeleteFromSQS)
\--message-> ... rest of pipeline
...
The reshuffle forces a materialization in all runners which means that the
message id will only get to "DeleteFromSQS" if the message was successfully
read by the unbounded source.
1:
https://github.com/apache/beam/blob/0e18bf4c81e09c193e113c74cac7301dc26dac9e/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java#L91
On Thu, Jul 19, 2018 at 3:28 PM John Rudolf Lewis <[email protected]>
wrote:
> hmm... made lots of progress on this today. But need help understanding
> something....
>
> UnboundedSource seems to assume that there is some guarantee of message
> ordering, and that you can get the timestamp of the current message. Using
> UnboundedSource.CheckpointMark to help advance the offset. Seems to work ok
> for any source that supports those assumptions. But SQS does not work this
> way.
>
> With a standard SQS queue, there is no guarantee of ordering and there is
> no timestamp for a message. With SQS, one needs to call the delete api
> using the receipt handle from the message to acknowledge receipt of a
> message and prevent its redelivery after the visibility timeout has expired.
>
> I'm not sure how to adapt these two patterns and would welcome suggestions.
>
>
>
> On Thu, Jul 19, 2018 at 7:40 AM, Jean-Baptiste Onofré <[email protected]>
> wrote:
>
>> Thx John !
>>
>> Regards
>> JB
>>
>> On 19/07/2018 16:39, John Rudolf Lewis wrote:
>> > Thank you.
>> >
>> > I've created a jira ticket to add SQS and have assigned it to
>> > myself: https://issues.apache.org/jira/browse/BEAM-4828
>> >
>> > Modified the documentation to show it as in-progress:
>> > https://github.com/apache/beam/pull/5995
>> >
>> > And will be starting my work
>> > here: https://github.com/JohnRudolfLewis/beam/tree/Add-SqsIO
>> >
>> > On Thu, Jul 19, 2018 at 1:43 AM, Jean-Baptiste Onofré <[email protected]
>> > <mailto:[email protected]>> wrote:
>> >
>> > Agree with Ismaël.
>> >
>> > I would be more than happy to help on this one (as I contributed on
>> AMQP
>> > and JMS IOs ;)).
>> >
>> > Regards
>> > JB
>> >
>> > On 19/07/2018 10:39, Ismaël Mejía wrote:
>> > > Thanks for your interest John, it would be a really nice
>> contribution
>> > > to add SQS support.
>> > >
>> > > Some context on the kinesis stuff:
>> > >
>> > > The reason why kinesis is still in a separate module is more
>> related
>> > > to a licensing problem. Kinesis uses some native libraries that
>> are
>> > > published under a not 100% apache compatible license and we are
>> not
>> > > allowed to shade and republish them but it seems there is a
>> workaround
>> > > now, for more details see
>> > > https://issues.apache.org/jira/browse/BEAM-3549
>> > <https://issues.apache.org/jira/browse/BEAM-3549>
>> > > In any case if to use SQS you only need the Apache licensed
>> aws-sdk
>> > > deps it is ok (and a good idea) if you put it in the
>> > > amazon-web-services module.
>> > >
>> > > The kinesis connector is way more complex for multiple reasons,
>> first,
>> > > the raw version of the amazon client libraries is not so
>> ‘friendly’
>> > > and the guys who created KinesisIO had to do some workarounds to
>> > > provide accurate checkpointing/watermarks. So since SQS is a way
>> > > simpler system you should probably be ok basing it in simpler
>> sources
>> > > like AMQP or JMS.
>> > >
>> > > If you feel like to, please create the JIRA and don’t hesitate to
>> ask
>> > > questions if you find issues or if you need some review.
>> > >
>> > > On Thu, Jul 19, 2018 at 12:55 AM Lukasz Cwik <[email protected]
>> > <mailto:[email protected]>> wrote:
>> > >>
>> > >>
>> > >>
>> > >> On Wed, Jul 18, 2018 at 3:30 PM John Rudolf Lewis
>> > <[email protected] <mailto:[email protected]>> wrote:
>> > >>>
>> > >>> I need an SQS source for my project that is using beam. A brief
>> > search did not turn up any in-progress work in this area. Please
>> > point me to the right repo if I missed it.
>> > >>
>> > >>
>> > >> To my knowledge there is none and nobody has marked it in
>> > progress on https://beam.apache.org/documentation/io/built-in/
>> > <https://beam.apache.org/documentation/io/built-in/>. It would be
>> > good to create a JIRA issue on https://issues.apache.org/ and send
>> a
>> > PR to add SQS to the inprogress list referencing your JIRA. I added
>> > you as a contributor in JIRA so you should be able to assign
>> > yourself to any issues that you create.
>> > >>
>> > >>>
>> > >>> Assuming there is no in-progress effort, I would like to
>> > contribute an Amazon SQS source. I have a few questions before I
>> begin.
>> > >>
>> > >>
>> > >> Great, note that this is a good starting point for authoring an
>> > IO transform:
>> > https://beam.apache.org/documentation/io/authoring-overview/
>> > <https://beam.apache.org/documentation/io/authoring-overview/>
>> > >>
>> > >>>
>> > >>>
>> > >>> It seems that the current AWS code is split into two different
>> > modules: sdk/java/io/amazon-web-services which contains the
>> > S3FileSystem, AwsOptions, etc, and sdk/java/io/kinesis which
>> > contains an unbounded source based on a kinesis topic. I'd like to
>> > add this source to the amazon-web-services module since I'd like to
>> > depend on AwsOptions. Does adding this source to the
>> > amazon-web-services module make sense?
>> > >>
>> > >>
>> > >> Putting it inside of amazon-web-services makes a lot of sense.
>> > The Google connectors all live within the one package and there has
>> > been discussion to consolidate all the AWS stuff under
>> > amazon-web-services.
>> > >>
>> > >>>
>> > >>> Also, the kinesis source looks a touch more complex than other
>> > sources. Both the JMS and AMQP sources look like better examples to
>> > follow. Which existing source would be the best to model this
>> > contribution after?
>> > >>
>> > >>
>> > >> Some of it has to do with how many ways a source can be read and
>> > how complicated the watermark tracking but it would be best if the
>> > IO authors comment on implementation details.
>> > >>
>> > >>>
>> > >>> If anyone has put some thoughts into this, or better yet some
>> > code, I'd appreciate hearing from you.
>> > >>>
>> > >>> Thanks!
>> > >>>
>> >
>> > --
>> > Jean-Baptiste Onofré
>> > [email protected] <mailto:[email protected]>
>> > http://blog.nanthrax.net
>> > Talend - http://www.talend.com
>> >
>> >
>>
>> --
>> Jean-Baptiste Onofré
>> [email protected]
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>
>
>