[
https://issues.apache.org/jira/browse/FLINK-19255?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kyle Bendickson updated FLINK-19255:
------------------------------------
Environment:
Any flink job using Async IO post this PR:
[https://github.com/apache/flink/pull/11177/files#diff-00b95aec1fa804a531b553610ec74c27R117]
(so I believe anything starting at either 1.9 or 1.10).
was:
Any flink job using AsyncIO post this PR:
[https://github.com/apache/flink/pull/11177/files#diff-00b95aec1fa804a531b553610ec74c27R117]
(so I believe anything starting at either 1.9 or 1.10).
> Add configuration to make AsyncWaitOperation Chainable
> ------------------------------------------------------
>
> Key: FLINK-19255
> URL: https://issues.apache.org/jira/browse/FLINK-19255
> Project: Flink
> Issue Type: Task
> Components: API / Core
> Affects Versions: 1.10.2, 1.11.2
> Environment: Any flink job using Async IO post this PR:
> [https://github.com/apache/flink/pull/11177/files#diff-00b95aec1fa804a531b553610ec74c27R117]
> (so I believe anything starting at either 1.9 or 1.10).
>
> Reporter: Kyle Bendickson
> Priority: Major
>
> Currently, we no longer chain AsyncIO calls. Instead, anything using AsyncIO
> starts the new head of an operator chain as a temporary workaround for this
> issue: https://issues.apache.org/jira/browse/FLINK-13063
>
> However, because this change can (and does in my customers' cases) have very
> large impact on the job graph size, and because people were previously
> accepting of their results, in the 1.10 release it was made so that
> AsyncWaitOperator could be chained in this issue
> https://issues.apache.org/jira/browse/FLINK-16219.
>
> However, it's very complicated and not intuitive for users to call out to
> operator factory methods. I have users who would very much like to not have
> their AsyncIO calls generate a new chain, as it's ballooned the number of
> state stores they have and they were accepting of their previous results. The
> only exmaple I could find was in the tests, and its rather convoluted.
>
> My proposal would be to add that config check just before the line of code in
> AsyncWaitOperator.java that would not add the following line, which is
> currently hardcoded into the operator and what requires one to use the
> operator factory:
> {noformat}
> setChainingStrategy(ChainingStrategy.ALWAYS){noformat}
>
> [https://github.com/apache/flink/pull/11177/files#diff-00b95aec1fa804a531b553610ec74c27R117]
>
> Given that this is considered potentially unsafe / legacy behavior, I would
> suggest that we add a config, something that explicitly calls this out as
> unsafe / legacy, so that users do not have to go through the unintuitive
> process of using operator factories but that new users know not to use this
> option or to use it at their own risk. We could also document that it is not
> necessarily going to be supported in the future if need be.
>
> My suggestion for config names that would avoid that setChainingStrategy line
> include
> {noformat}
> taskmanager.async-io-operator.legacy-unsafe-chaining-strategy{noformat}
> which specifically calls this behavior out as legacy and unsafe.
>
> Another possible name could be
> {noformat}
> pipeline.operator-chaining.async-io.legacy-inconsistent-chaining-strategy-always{noformat}
> (which would be more in line with the existing config of
> pipeline.operator-chaining).
>
>
> Given that it is possible to stop operator chaining, it's just very
> unintuitive and requires using operator factories, I think that this
> configuration would be a good addition. I would be happy to submit a PR, with
> tests, and updated documentation, so that power users who are looking to do
> this could enable / disable this behavior without having to change their code
> much.
>
> I recognize that this might be an odd request as this has been deemed unsafe,
> but this change has made it very difficult for some of my users to use
> rocksdb, namely those with very large state that previously made very liberal
> use of AsyncIO (especially for things like analytics events which can be sent
> on a best effort basis) and who therefore have a very large job graph after
> this change.
>
> If anybody has any better suggestions for names, I'd be open to them. And
> then as mentioned, I'd be happy to submit a PR with tests etc.
>
> For reference, here are the tests where I found the ability to use the
> operator factory and here is the utility function which is needed to create a
> chained async io operator vertex. Note that this utility function is in the
> test and not part of the public facing API.
> [https://github.com/apache/flink/blob/3a04e179e09224b09c4ee656d31558844da83a26/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java#L880-L912]
> If there is a simpler way to handle this, I'd be happy to hear it. Otherwise,
> since this behavior is technically already specifically enabled (as called
> out in the changelog from Flink 1.11), I think it makes sense to add a config
> and either document that its legacy behavior, unsafe (or inconsistent, up to
> you), and that it could go away at any time.
>
> But it seems unnecessary to require users to go through so many extra hoops
> in the code, especially for users who share operators amongst different jobs
> which might be configured to use different state backends. Not to mention
> that some of these users want the legacy behavior and others would prefer to
> play it safe and accept the additional shuffle, so a code fix is not always
> feasible when code is shared, but the enabling / disabling of a cluster level
> config would still allow for shared code.
>
> I'd be happy to submit a patch once this issue is discussed.
>
> Thank you,
> Kyle B. - Data Services @ Tinder
--
This message was sent by Atlassian Jira
(v8.3.4#803005)