Hi Flink Team, The Iceberg table maintenance proposal is on vote on the Iceberg dev list [1].
Non-binding votes are important too, so if you are interested, please vote. Thanks, Peter [1] - https://lists.apache.org/thread/qhz17ppdbb57ql356j49qqk3nyk59rvm On Tue, Apr 2, 2024, 08:35 Péter Váry <peter.vary.apa...@gmail.com> wrote: > Thanks Wenjin for your response. > > See my answers below: > > On Tue, Apr 2, 2024, 04:08 wenjin <wenjin...@gmail.com> wrote: > >> Hi Peter, >> Thanks a lot for your answers, this detailed explanation has cleared my >> confusions and been greatly beneficial to me. If you don't mind, could I >> discuss two more questions with your? >> >> As you mentioned in your proposal and answers, the maintenance tasks may >> cause resource interference and delay the checkpoint, and to my opinion, >> they may also backpressure upstream when exists performance question. So, >> If it is a better way to recommend users to run maintenance tasks in a >> sperate Flink job? >> > > For some users - small tables, manageable amounts of data - architectural > simplicity is more important, than resource usage. Also, in the long term, > I hope Autoscaling can help with in-place scaling for these jobs. > > But I definitely agree, that bigger, more resource constrained jobs are > need to separate out compaction to another job. > > >> As you mentioned in your proposal: "We can serialize different >> maintenance tasks by chaining them together, but maintenance tasks >> overlapping from consecutive runs also need to be prevented.” >> In my understanding, if maintenance tasks are chained together in one >> vertex, just like >> "scheduler1->task1->scheduler2->task2->scheduler3->task3->schduler4->task4",they >> will be executed serially,and only after task4 finished, scheduler1 will >> process next record. How can the overlapping of maintenance tasks happen? >> > > When I talk about chained tasks, they are not chained into a single vertex. > > They are using the output of the previous task to start the next task, but > all of them has multiple operators (some of them are with different > parallelism), which prevents them to got into a single vertex. > > So overlapping could happen if a new input triggers a parallel scheduling. > > On the other hand, ensure maintenance tasks do not run concurrently by >> chaing them together is not guaranteed, for there may be case diable the >> chain. In this case, I think using tags is a better way than lock >> mechanisms, for simplicity and ease of use for user. >> >> Thanks, >> Wenjin. >> >> On 2024/03/30 13:22:12 Péter Váry wrote: >> > Hi Wenjin, >> > >> > See my answers below: >> > >> > On Sat, Mar 30, 2024, 10:54 wenjin <we...@gmail.com> wrote: >> > >> > > Hi Peter, >> > > >> > > I am interested in your proposal and think make iceberg Flink >> Connector >> > > support running maintenance task is meaningful . If possible, could >> you >> > > help me clarify a few confusions. >> > > >> > > - When the iceberg table is written by single Flink job (use case1, >> 2),the >> > > maintenance tasks will be added to the post commit topology. How dose >> the >> > > maintenance tasks execute? Synchronously or Asynchronously? Will the >> > > maintenance tasks block the data processing of Flink job? >> > > >> > >> > The sceduling and maintenance tasks are just regular Flink operators. >> Also >> > the scheduler will make sure that the maintenance tasks are not chained >> to >> > the Iceberg committer, so I would call this Asynchronous. >> > Flink operators do not block other operators, but the maintenance tasks >> are >> > competing for resources with the other data processing tasks. That is >> why >> > we provide the possibility to define slot sharing groups for the >> > maintenance tasks. This allows the users to separate the provided >> resources >> > as much as Flink allows. >> > >> > I have seen only one exception to this separation where we emit high >> number >> > of records in the maintenance flow, which would cause delays in starting >> > the checpoints, but it could be mitigated by enabling unaligned >> > checkpoints, and using AsyncIO. There is one issue with AsynIO found by >> > Gyula Fora: https://issues.apache.org/jira/browse/FLINK-34704 which >> means, >> > even with AsyncIO the checkpoint could be blocked until at least one >> > compaction group is finished. >> > >> > - When the iceberg table is written by multi Flink jobs (use case 3), >> user >> > > need to create a separate Flink job to run the maintenance task. In >> this >> > > case, if user do not create a single job, but enable run maintenance >> task >> > > in exist Flink jobs just like use case 1, what would be the >> consequences? >> > > Or, is there an automatic mechanism to avoid this issue? >> > > >> > >> > The user needs to create a new job, or chose a single job to run the >> > maintenance tasks to avoid running concurrent instances of the >> compaction >> > tasks. >> > Even if concurrent compaction tasks could be handled, they would be a >> > serious waste of resources and increase the likelihood of failing tasks >> due >> > to concurrent changes on the table. So we do not plan to support this >> ATM. >> > >> > About the difference of the 2 scheduling method: >> > - In case 1-2, the scheduling information is coming from the Iceberg >> > committer - this is working for a single writer. >> > - In case 3, the scheduling information is coming from the monitor - >> this >> > is working for any numbers of writers. >> > >> > So even if the maintenance tasks are run in one of the jobs, when there >> are >> > multiple writers, the scheduling should be based on monitoring the >> changes >> > on the table, instead of the information coming from the committer >> (which >> > could only contain the changes only from a single writer) >> > >> > I hope this helps, >> > Peter >> > >> > >> > > Thank you. >> > > >> > > Best, >> > > Wenjin >> > > >> > > On 2024/03/28 17:59:49 Péter Váry wrote: >> > > > Hi Team, >> > > > >> > > > I am working on adding a possibility to the Flink Iceberg connector >> to >> > > run >> > > > maintenance tasks on the Iceberg tables. This will fix the small >> files >> > > > issues and in the long run help compacting the high number of >> positional >> > > > and equality deletes created by Flink tasks writing CDC data to >> Iceberg >> > > > tables without the need of Spark in the infrastructure. >> > > > >> > > > I did some planning, prototyping and currently trying out the >> solution >> > > on a >> > > > larger scale. >> > > > >> > > > I put together a document how my current solution looks like: >> > > > >> > > >> https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit?usp=sharing >> > > < >> > > >> https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit?usp=sharing >> > > > >> > > > >> > > > I would love to hear your thoughts and feedback on this to find a >> good >> > > > final solution. >> > > > >> > > > Thanks, >> > > > Peter >> > > > >> > > >