A timestamp for a message is fundamental to an element in a PCollection.
What do you mean by not knowing timestamp of a message?
There is finalizeCheckpoint API[1] in UnboundedSource. Does that help?
PubSub is also very similar, a message need to be acked with in a timeout,
otherwise it will be redelivered to one of the consumer. Pubsub messages
are acked inside finalize().

[1]:
https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L129

On Thu, Jul 19, 2018 at 3:28 PM John Rudolf Lewis <[email protected]>
wrote:

> hmm... made lots of progress on this today. But need help understanding
> something....
>
> UnboundedSource seems to assume that there is some guarantee of message
> ordering, and that you can get the timestamp of the current message. Using
> UnboundedSource.CheckpointMark to help advance the offset. Seems to work ok
> for any source that supports those assumptions. But SQS does not work this
> way.
>
> With a standard SQS queue, there is no guarantee of ordering and there is
> no timestamp for a message.  With SQS, one needs to call the delete api
> using the receipt handle from the message to acknowledge receipt of a
> message and prevent its redelivery after the visibility timeout has expired.
>
> I'm not sure how to adapt these two patterns and would welcome suggestions.
>
>
>
> On Thu, Jul 19, 2018 at 7:40 AM, Jean-Baptiste Onofré <[email protected]>
> wrote:
>
>> Thx John !
>>
>> Regards
>> JB
>>
>> On 19/07/2018 16:39, John Rudolf Lewis wrote:
>> > Thank you.
>> >
>> > I've created a jira ticket to add SQS and have assigned it to
>> > myself: https://issues.apache.org/jira/browse/BEAM-4828
>> >
>> > Modified the documentation to show it as in-progress:
>> > https://github.com/apache/beam/pull/5995
>> >
>> > And will be starting my work
>> > here: https://github.com/JohnRudolfLewis/beam/tree/Add-SqsIO
>> >
>> > On Thu, Jul 19, 2018 at 1:43 AM, Jean-Baptiste Onofré <[email protected]
>> > <mailto:[email protected]>> wrote:
>> >
>> >     Agree with Ismaël.
>> >
>> >     I would be more than happy to help on this one (as I contributed on
>> AMQP
>> >     and JMS IOs ;)).
>> >
>> >     Regards
>> >     JB
>> >
>> >     On 19/07/2018 10:39, Ismaël Mejía wrote:
>> >     > Thanks for your interest John, it would be a really nice
>> contribution
>> >     > to add SQS support.
>> >     >
>> >     > Some context on the kinesis stuff:
>> >     >
>> >     > The reason why kinesis is still in a separate module is more
>> related
>> >     > to a licensing problem. Kinesis uses some native libraries that
>> are
>> >     > published under a not 100% apache compatible license and we are
>> not
>> >     > allowed to shade and republish them but it seems there is a
>> workaround
>> >     > now, for more details see
>> >     > https://issues.apache.org/jira/browse/BEAM-3549
>> >     <https://issues.apache.org/jira/browse/BEAM-3549>
>> >     > In any case if to use SQS you only need the Apache licensed
>> aws-sdk
>> >     > deps it is ok (and a good idea) if you put it in the
>> >     > amazon-web-services module.
>> >     >
>> >     > The kinesis connector is way more complex for multiple reasons,
>> first,
>> >     > the raw version of the amazon client libraries is not so
>> ‘friendly’
>> >     > and the guys who created KinesisIO had to do some workarounds to
>> >     > provide accurate checkpointing/watermarks. So since SQS is a way
>> >     > simpler system you should probably be ok basing it in simpler
>> sources
>> >     > like AMQP or JMS.
>> >     >
>> >     > If you feel like to, please create the JIRA and don’t hesitate to
>> ask
>> >     > questions if you find issues or if you need some review.
>> >     >
>> >     > On Thu, Jul 19, 2018 at 12:55 AM Lukasz Cwik <[email protected]
>> >     <mailto:[email protected]>> wrote:
>> >     >>
>> >     >>
>> >     >>
>> >     >> On Wed, Jul 18, 2018 at 3:30 PM John Rudolf Lewis
>> >     <[email protected] <mailto:[email protected]>> wrote:
>> >     >>>
>> >     >>> I need an SQS source for my project that is using beam. A brief
>> >     search did not turn up any in-progress work in this area. Please
>> >     point me to the right repo if I missed it.
>> >     >>
>> >     >>
>> >     >> To my knowledge there is none and nobody has marked it in
>> >     progress on https://beam.apache.org/documentation/io/built-in/
>> >     <https://beam.apache.org/documentation/io/built-in/>. It would be
>> >     good to create a JIRA issue on https://issues.apache.org/ and send
>> a
>> >     PR to add SQS to the inprogress list referencing your JIRA. I added
>> >     you as a contributor in JIRA so you should be able to assign
>> >     yourself to any issues that you create.
>> >     >>
>> >     >>>
>> >     >>> Assuming there is no in-progress effort, I would like to
>> >     contribute an Amazon SQS source. I have a few questions before I
>> begin.
>> >     >>
>> >     >>
>> >     >> Great, note that this is a good starting point for authoring an
>> >     IO transform:
>> >     https://beam.apache.org/documentation/io/authoring-overview/
>> >     <https://beam.apache.org/documentation/io/authoring-overview/>
>> >     >>
>> >     >>>
>> >     >>>
>> >     >>> It seems that the current AWS code is split into two different
>> >     modules: sdk/java/io/amazon-web-services which contains the
>> >     S3FileSystem, AwsOptions, etc, and sdk/java/io/kinesis which
>> >     contains an unbounded source based on a kinesis topic. I'd like to
>> >     add this source to the amazon-web-services module since I'd like to
>> >     depend on AwsOptions. Does adding this source to the
>> >     amazon-web-services module make sense?
>> >     >>
>> >     >>
>> >     >> Putting it inside of amazon-web-services makes a lot of sense.
>> >     The Google connectors all live within the one package and there has
>> >     been discussion to consolidate all the AWS stuff under
>> >     amazon-web-services.
>> >     >>
>> >     >>>
>> >     >>> Also, the kinesis source looks a touch more complex than other
>> >     sources. Both the JMS and AMQP sources look like better examples to
>> >     follow. Which existing source would be the best to model this
>> >     contribution after?
>> >     >>
>> >     >>
>> >     >> Some of it has to do with how many ways a source can be read and
>> >     how complicated the watermark tracking but it would be best if the
>> >     IO authors comment on implementation details.
>> >     >>
>> >     >>>
>> >     >>> If anyone has put some thoughts into this, or better yet some
>> >     code, I'd appreciate hearing from you.
>> >     >>>
>> >     >>> Thanks!
>> >     >>>
>> >
>> >     --
>> >     Jean-Baptiste Onofré
>> >     [email protected] <mailto:[email protected]>
>> >     http://blog.nanthrax.net
>> >     Talend - http://www.talend.com
>> >
>> >
>>
>> --
>> Jean-Baptiste Onofré
>> [email protected]
>> http://blog.nanthrax.net
>> Talend - http://www.talend.com
>>
>
>

Reply via email to