[ 
https://issues.apache.org/jira/browse/BEAM-13203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17441208#comment-17441208
 ] 

Moritz Mack edited comment on BEAM-13203 at 11/9/21, 3:30 PM:
--------------------------------------------------------------

[~kenn]  [~echauchot] [~aromanenko] 
Based on my potentially flawed assessment SnsIO.{*}writeAsync{*} is 
conceptually so broken that it will require significant work to get it into a 
reliable state. I can hardly imagine anyone runs this successfully in 
production. 

SnsIO.{*}write{*} on the other hand is just fine, so I'm not sure what the 
right thing to do is (fix it or deprecate/remove it). Most of all I wanted to 
raise a red flag regarding *writeAsynch.*

In terms of priorities it seems almost more important to work towards getting 
feature parity between SDK v1 and v2 so v1 can finally be deprecated (and 
eventually be dropped).


was (Author: mosche):
[~kenn]  [~echauchot] [~aromanenko] 
Based on my potentially flawed assessment SnsIO.{*}writeAsync{*} is 
conceptually so broken that it will require significant work to get it into a 
reliable state. I can hardly imagine anyone runs this successfully in 
production. 

SnsIO.{*}write{*} on the other hand is just fine, so I'm not sure what the 
right thing to do is (fix it, deprecate/remove it). Most of all I wanted to 
raise a red flag regarding *writeAsynch.*

In terms of priorities it seems almost more important to work towards getting 
feature parity between SDK v1 and v2 so v1 can finally be deprecated (and 
eventually be dropped).

> Potential data loss when using SnsIO.writeAsync
> -----------------------------------------------
>
>                 Key: BEAM-13203
>                 URL: https://issues.apache.org/jira/browse/BEAM-13203
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-aws
>            Reporter: Moritz Mack
>            Priority: P0
>
> This needs to be investigated, reading the code suggests we might be losing 
> data under certain conditions e.g. when terminating the pipeline. The async 
> processing model here is far too simplistic.
> The bundle won't ever know about pending writes and won't block to wait for 
> any such operation. The same way exceptions are thrown into nowhere. Test 
> cases don't capture this as they operate on completed futures only (so 
> exceptions in the callbacks get thrown on the thread of processElement).
> {code:java}
> client.publish(publishRequest).whenComplete((response, ex) -> {
>   if (ex == null) {
>     SnsResponse<T> snsResponse = SnsResponse.of(context.element(), response);
>     context.output(snsResponse);
>   } else {
>     LOG.error("Error while publishing request to SNS", ex);
>     throw new SnsWriteException("Error while publishing request to SNS", ex);
>   }
> }); {code}
> Also, this entirely removes backpressure from a stream. When used with a much 
> faster source we will continue to accumulate more and more memory as the 
> number of concurrent pending async operations is not limited.
> Spotify's scio contains a 
> [JavaAsyncDoFn|https://github.com/spotify/scio/blob/main/scio-core/src/main/java/com/spotify/scio/transforms/JavaAsyncDoFn.java]
>  that illustrates how it can be done.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to