[ 
https://issues.apache.org/jira/browse/FLINK-27405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17530078#comment-17530078
 ] 

Steven Zhen Wu commented on FLINK-27405:
----------------------------------------

[~arvid] thanks a lot for the feedbacks. We will create a FLIP to the new 
public abstract class `CoordinatorBase`. Yes we can drive the effort.

For now, we don't plan to put any checkpoint related logics into the 
`CoordinatorBase`. We will just move the minimally required APIs (that are also 
reasonable) to the base class in the first iteration. 

> Refactor SourceCoordinator to abstract BaseCoordinator implementation
> ---------------------------------------------------------------------
>
>                 Key: FLINK-27405
>                 URL: https://issues.apache.org/jira/browse/FLINK-27405
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Coordination
>            Reporter: gang ye
>            Priority: Major
>
> To solve small files issue caused by data skewness, Flink Iceberg data 
> shuffling was proposed(design doc 
> [https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo/edit]#).
>  The basic idea is to use statistics operator to collect local statistics for 
> traffic distribution at taskmanagers (workers). Local statistics are 
> periodically sent to the statistics coordinator (running in jobmanager). Once 
> globally aggregated statistics are ready, the statistics coordinator 
> broadcasts them to all operator instances. And then a customized partitioner 
> uses the global statistics which is passed down from statistics operator to 
> distribute data to Iceberg writers.
> In the process of Flink Iceberg data shuffling implementation, we found that, 
> StatisticsCoordinator can share function with 
> SourceCoordinator#runInEventLoop, StatisticsCoordinatorContext needs similar 
> function as SourceCoordinatorConext#callInCoordinatorThread and the 
> StatisticsCoordinatorProvider ExecutorThreadFactory logic is almost same as 
> SourceCoordinatorProvider#CoordinatorExecutorThreadFactory. So we would want 
> to refactor the source coordinator classes to abstract a general coordinator 
> implementation to reduce the duplicated code when adding other coordinators. 
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to