[
https://issues.apache.org/jira/browse/BEAM-13203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17465984#comment-17465984
]
Moritz Mack commented on BEAM-13203:
------------------------------------
[~aromanenko] With _conceptually broken_ I mean the design (and implementation)
is broken both with respect to the beam programming model as well as general
async programming:
* Output for a bundle may be emitted even after the bundle is finished (lack
of join / waiting for pending futures), additionally output is emitted from
multiple threads risking concurrent modification issues on underlying data
structures
* Exceptions disappear into the void (respective tests are broken as they
don't throw asynchronously and could make one think this works)
* Parallelism is unlimited causing the writer to instantly accumulate
everything into memory
IMHO, the async writer should just be deprecated. I don't see much value in
fixing it as there already is a working one and the backlog of critical /
important issues for AWS IOs is already far too long.
> Potential data loss when using SnsIO.writeAsync
> -----------------------------------------------
>
> Key: BEAM-13203
> URL: https://issues.apache.org/jira/browse/BEAM-13203
> Project: Beam
> Issue Type: Bug
> Components: io-java-aws
> Reporter: Moritz Mack
> Priority: P1
>
> This needs to be investigated, reading the code suggests we might be losing
> data under certain conditions e.g. when terminating the pipeline. The async
> processing model here is far too simplistic.
> The bundle won't ever know about pending writes and won't block to wait for
> any such operation. The same way exceptions are thrown into nowhere. Test
> cases don't capture this as they operate on completed futures only (so
> exceptions in the callbacks get thrown on the thread of processElement).
> {code:java}
> client.publish(publishRequest).whenComplete((response, ex) -> {
> if (ex == null) {
> SnsResponse<T> snsResponse = SnsResponse.of(context.element(), response);
> context.output(snsResponse);
> } else {
> LOG.error("Error while publishing request to SNS", ex);
> throw new SnsWriteException("Error while publishing request to SNS", ex);
> }
> }); {code}
> Also, this entirely removes backpressure from a stream. When used with a much
> faster source we will continue to accumulate more and more memory as the
> number of concurrent pending async operations is not limited.
> Spotify's scio contains a
> [JavaAsyncDoFn|https://github.com/spotify/scio/blob/main/scio-core/src/main/java/com/spotify/scio/transforms/JavaAsyncDoFn.java]
> that illustrates how it can be done.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)