On Mon, Jul 23, 2018 at 2:25 PM John Rudolf Lewis <[email protected]>
wrote:

> So I guess I can add a timestamp to the message attributes when i receive
> it from SQS since there is no such built in property.
> But what triggers finilizeCheckpoint to be called? So far in my testing, I
> never see that method get called, and hence, my messages keep getting
> redelivered.
>

Are you testing with direct runner? It should be called after first stage
processes (i.e. the checkpoint mark is durably committed by the runner).

Raghu.


>
>
> On Thu, Jul 19, 2018 at 5:26 PM, Raghu Angadi <[email protected]> wrote:
>
>> A timestamp for a message is fundamental to an element in a PCollection.
>> What do you mean by not knowing timestamp of a message?
>> There is finalizeCheckpoint API[1] in UnboundedSource. Does that help?
>> PubSub is also very similar, a message need to be acked with in a timeout,
>> otherwise it will be redelivered to one of the consumer. Pubsub messages
>> are acked inside finalize().
>>
>> [1]:
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/UnboundedSource.java#L129
>>
>>
>> On Thu, Jul 19, 2018 at 3:28 PM John Rudolf Lewis <[email protected]>
>> wrote:
>>
>>> hmm... made lots of progress on this today. But need help understanding
>>> something....
>>>
>>> UnboundedSource seems to assume that there is some guarantee of message
>>> ordering, and that you can get the timestamp of the current message. Using
>>> UnboundedSource.CheckpointMark to help advance the offset. Seems to work ok
>>> for any source that supports those assumptions. But SQS does not work this
>>> way.
>>>
>>> With a standard SQS queue, there is no guarantee of ordering and there
>>> is no timestamp for a message.  With SQS, one needs to call the delete api
>>> using the receipt handle from the message to acknowledge receipt of a
>>> message and prevent its redelivery after the visibility timeout has expired.
>>>
>>> I'm not sure how to adapt these two patterns and would welcome
>>> suggestions.
>>>
>>>
>>>
>>> On Thu, Jul 19, 2018 at 7:40 AM, Jean-Baptiste Onofré <[email protected]>
>>> wrote:
>>>
>>>> Thx John !
>>>>
>>>> Regards
>>>> JB
>>>>
>>>> On 19/07/2018 16:39, John Rudolf Lewis wrote:
>>>> > Thank you.
>>>> >
>>>> > I've created a jira ticket to add SQS and have assigned it to
>>>> > myself: https://issues.apache.org/jira/browse/BEAM-4828
>>>> >
>>>> > Modified the documentation to show it as in-progress:
>>>> > https://github.com/apache/beam/pull/5995
>>>> >
>>>> > And will be starting my work
>>>> > here: https://github.com/JohnRudolfLewis/beam/tree/Add-SqsIO
>>>> >
>>>> > On Thu, Jul 19, 2018 at 1:43 AM, Jean-Baptiste Onofré <
>>>> [email protected]
>>>> > <mailto:[email protected]>> wrote:
>>>> >
>>>> >     Agree with Ismaël.
>>>> >
>>>> >     I would be more than happy to help on this one (as I contributed
>>>> on AMQP
>>>> >     and JMS IOs ;)).
>>>> >
>>>> >     Regards
>>>> >     JB
>>>> >
>>>> >     On 19/07/2018 10:39, Ismaël Mejía wrote:
>>>> >     > Thanks for your interest John, it would be a really nice
>>>> contribution
>>>> >     > to add SQS support.
>>>> >     >
>>>> >     > Some context on the kinesis stuff:
>>>> >     >
>>>> >     > The reason why kinesis is still in a separate module is more
>>>> related
>>>> >     > to a licensing problem. Kinesis uses some native libraries that
>>>> are
>>>> >     > published under a not 100% apache compatible license and we are
>>>> not
>>>> >     > allowed to shade and republish them but it seems there is a
>>>> workaround
>>>> >     > now, for more details see
>>>> >     > https://issues.apache.org/jira/browse/BEAM-3549
>>>> >     <https://issues.apache.org/jira/browse/BEAM-3549>
>>>> >     > In any case if to use SQS you only need the Apache licensed
>>>> aws-sdk
>>>> >     > deps it is ok (and a good idea) if you put it in the
>>>> >     > amazon-web-services module.
>>>> >     >
>>>> >     > The kinesis connector is way more complex for multiple reasons,
>>>> first,
>>>> >     > the raw version of the amazon client libraries is not so
>>>> ‘friendly’
>>>> >     > and the guys who created KinesisIO had to do some workarounds to
>>>> >     > provide accurate checkpointing/watermarks. So since SQS is a way
>>>> >     > simpler system you should probably be ok basing it in simpler
>>>> sources
>>>> >     > like AMQP or JMS.
>>>> >     >
>>>> >     > If you feel like to, please create the JIRA and don’t hesitate
>>>> to ask
>>>> >     > questions if you find issues or if you need some review.
>>>> >     >
>>>> >     > On Thu, Jul 19, 2018 at 12:55 AM Lukasz Cwik <[email protected]
>>>> >     <mailto:[email protected]>> wrote:
>>>> >     >>
>>>> >     >>
>>>> >     >>
>>>> >     >> On Wed, Jul 18, 2018 at 3:30 PM John Rudolf Lewis
>>>> >     <[email protected] <mailto:[email protected]>> wrote:
>>>> >     >>>
>>>> >     >>> I need an SQS source for my project that is using beam. A
>>>> brief
>>>> >     search did not turn up any in-progress work in this area. Please
>>>> >     point me to the right repo if I missed it.
>>>> >     >>
>>>> >     >>
>>>> >     >> To my knowledge there is none and nobody has marked it in
>>>> >     progress on https://beam.apache.org/documentation/io/built-in/
>>>> >     <https://beam.apache.org/documentation/io/built-in/>. It would be
>>>> >     good to create a JIRA issue on https://issues.apache.org/ and
>>>> send a
>>>> >     PR to add SQS to the inprogress list referencing your JIRA. I
>>>> added
>>>> >     you as a contributor in JIRA so you should be able to assign
>>>> >     yourself to any issues that you create.
>>>> >     >>
>>>> >     >>>
>>>> >     >>> Assuming there is no in-progress effort, I would like to
>>>> >     contribute an Amazon SQS source. I have a few questions before I
>>>> begin.
>>>> >     >>
>>>> >     >>
>>>> >     >> Great, note that this is a good starting point for authoring an
>>>> >     IO transform:
>>>> >     https://beam.apache.org/documentation/io/authoring-overview/
>>>> >     <https://beam.apache.org/documentation/io/authoring-overview/>
>>>> >     >>
>>>> >     >>>
>>>> >     >>>
>>>> >     >>> It seems that the current AWS code is split into two different
>>>> >     modules: sdk/java/io/amazon-web-services which contains the
>>>> >     S3FileSystem, AwsOptions, etc, and sdk/java/io/kinesis which
>>>> >     contains an unbounded source based on a kinesis topic. I'd like to
>>>> >     add this source to the amazon-web-services module since I'd like
>>>> to
>>>> >     depend on AwsOptions. Does adding this source to the
>>>> >     amazon-web-services module make sense?
>>>> >     >>
>>>> >     >>
>>>> >     >> Putting it inside of amazon-web-services makes a lot of sense.
>>>> >     The Google connectors all live within the one package and there
>>>> has
>>>> >     been discussion to consolidate all the AWS stuff under
>>>> >     amazon-web-services.
>>>> >     >>
>>>> >     >>>
>>>> >     >>> Also, the kinesis source looks a touch more complex than other
>>>> >     sources. Both the JMS and AMQP sources look like better examples
>>>> to
>>>> >     follow. Which existing source would be the best to model this
>>>> >     contribution after?
>>>> >     >>
>>>> >     >>
>>>> >     >> Some of it has to do with how many ways a source can be read
>>>> and
>>>> >     how complicated the watermark tracking but it would be best if the
>>>> >     IO authors comment on implementation details.
>>>> >     >>
>>>> >     >>>
>>>> >     >>> If anyone has put some thoughts into this, or better yet some
>>>> >     code, I'd appreciate hearing from you.
>>>> >     >>>
>>>> >     >>> Thanks!
>>>> >     >>>
>>>> >
>>>> >     --
>>>> >     Jean-Baptiste Onofré
>>>> >     [email protected] <mailto:[email protected]>
>>>> >     http://blog.nanthrax.net
>>>> >     Talend - http://www.talend.com
>>>> >
>>>> >
>>>>
>>>> --
>>>> Jean-Baptiste Onofré
>>>> [email protected]
>>>> http://blog.nanthrax.net
>>>> Talend - http://www.talend.com
>>>>
>>>
>>>
>

Reply via email to