[ 
https://issues.apache.org/jira/browse/FLINK-27405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17529957#comment-17529957
 ] 

Arvid Heise commented on FLINK-27405:
-------------------------------------

Hi [~stevenz3wu],

I think this approach looks promising, here are a couple of gotchas:
- Coordinator API is internal, so you either need a FLIP to make it public or 
you need to create some public abstraction on top of it which requires a FLIP. 
This is especially important for Iceberg since it's living in its own 
repository (as all connectors should).
- Coordinators checkpoint independent of the related subtasks at the beginning 
of a checkpoint at the job manager. For a statistics component that should not 
matter in practice but you should be careful who owns which part of information 
and what happens when these parts are checkpointed out of sync and recovered 
later. One way would be to always checkpoint the collected statistics on 
subtask level and to not checkpoint any derived information. Upon recovery, all 
subtasks resend their recovered statics to the coordinator. That makes the 
coordinator effectively stateless.
- This coordinator checkpointing behavior might make releasing coordinator API 
harder. Alternatively, we could implement some kind of checkpointing alignment 
in a separate effort that acts as a prerequisite.
- Metric groups are still not implemented on coordinator side :(

I do believe that a public coordinator API would be beneficial for quite some 
advanced use cases. I'm not sure if you or someone else have the capacity to 
lead this effort.

> Refactor SourceCoordinator to abstract BaseCoordinator implementation
> ---------------------------------------------------------------------
>
>                 Key: FLINK-27405
>                 URL: https://issues.apache.org/jira/browse/FLINK-27405
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Coordination
>            Reporter: gang ye
>            Priority: Major
>
> To solve small files issue caused by data skewness, Flink Iceberg data 
> shuffling was proposed(design doc 
> [https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo/edit]#).
>  The basic idea is to use statistics operator to collect local statistics for 
> traffic distribution at taskmanagers (workers). Local statistics are 
> periodically sent to the statistics coordinator (running in jobmanager). Once 
> globally aggregated statistics are ready, the statistics coordinator 
> broadcasts them to all operator instances. And then a customized partitioner 
> uses the global statistics which is passed down from statistics operator to 
> distribute data to Iceberg writers.
> In the process of Flink Iceberg data shuffling implementation, we found that, 
> StatisticsCoordinator can share function with 
> SourceCoordinator#runInEventLoop, StatisticsCoordinatorContext needs similar 
> function as SourceCoordinatorConext#callInCoordinatorThread and the 
> StatisticsCoordinatorProvider ExecutorThreadFactory logic is almost same as 
> SourceCoordinatorProvider#CoordinatorExecutorThreadFactory. So we would want 
> to refactor the source coordinator classes to abstract a general coordinator 
> implementation to reduce the duplicated code when adding other coordinators. 
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to