I think this makes sense, +1 from my side; as I wrote on the ticket, I'm
not aware of any other usages apart from the kinesis connector, and we
already have more feature complete API that can replace the functionality
there.

Best,
D.

On Mon, Feb 27, 2023 at 2:44 PM Zhanghao Chen <zhanghao.c...@outlook.com>
wrote:

> Hi dev,
>
> I'd like to open discussion on deprecating Global Aggregate Manager in
> favor of Operator Coordinator.
>
>
>   1.  Global Aggregate Manager is rarely used and can be replaced by
> Opeator Coordinator. Global Aggregate Manager was introduced in [1]<
> https://issues.apache.org/jira/browse/FLINK-10886> to support event time
> synchronization across sources and more generally, coordination of parallel
> tasks. AFAIK, this was only used in the Kinesis source [2] for an early
> version of watermark alignment. Operator Coordinator, introduced in [3],
> provides a more powerful and elegant solution for that need and is part of
> the new source API standard.
>   2.  Global Aggregate Manager manages state in JobMaster object, causing
> problems for adaptive parallelism changes. It maintains a state (the
> accumulators field in JobMaster) in JM memory. The accumulator state
> content is defined in user code. In my company, a user stores task
> parallelism in the accumulator, assuming task parallelism never changes.
> However, this assumption is broken when using adaptive scheduler. See [4]
> for more details.
>
> Therefore, I think we should deprecate the use of Global Aggregate
> Manager, which can improve the maintainability of the Flink codebase
> without compromising its functionality. Looking forward to your opinions on
> this.
>
> [1] https://issues.apache.org/jira/browse/FLINK-10886
> [2]
> https://github.com/apache/flink-connector-aws/blob/d0817fecdcaa53c4bf039761c2d1a16e8fb9f89b/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/JobManagerWatermarkTracker.java
> [3]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface#FLIP27:RefactorSourceInterface-SplitEnumerator
> [4] [FLINK-31245] Adaptive scheduler does not reset the state of
> GlobalAggregateManager when rescaling - ASF JIRA (apache.org)<
> https://issues.apache.org/jira/browse/FLINK-31245?page=com.atlassian.jira.plugin.system.issuetabpanels%3Aall-tabpanel
> >
>
> Best,
> Zhanghao Chen
>

Reply via email to