Hi Flink Team,

The Iceberg table maintenance proposal is on vote on the Iceberg dev list
[1].

Non-binding votes are important too, so if you are interested, please vote.

Thanks,
Peter

[1] - https://lists.apache.org/thread/qhz17ppdbb57ql356j49qqk3nyk59rvm

On Tue, Apr 2, 2024, 08:35 Péter Váry <peter.vary.apa...@gmail.com> wrote:

> Thanks Wenjin for your response.
>
> See my answers below:
>
> On Tue, Apr 2, 2024, 04:08 wenjin <wenjin...@gmail.com> wrote:
>
>> Hi Peter,
>> Thanks a lot for your answers, this detailed explanation has cleared my
>> confusions and been greatly beneficial to me. If you don't mind, could I
>> discuss two more questions with your?
>>
>> As you mentioned in your proposal and answers, the maintenance tasks may
>> cause resource interference and delay the checkpoint, and to my opinion,
>> they may also backpressure upstream when exists performance question. So,
>> If it is a better way to recommend users to run maintenance tasks in a
>> sperate Flink job?
>>
>
> For some users - small tables, manageable amounts of data - architectural
> simplicity is more important, than resource usage. Also, in the long term,
> I hope Autoscaling can help with in-place scaling for these jobs.
>
> But I definitely agree, that bigger, more resource constrained jobs are
> need to separate out compaction to another job.
>
>
>> As you mentioned in your proposal: "We can serialize different
>> maintenance tasks by chaining them together, but maintenance tasks
>> overlapping from consecutive runs also need to be prevented.”
>> In my understanding, if maintenance tasks are chained together in one
>> vertex, just like
>> "scheduler1->task1->scheduler2->task2->scheduler3->task3->schduler4->task4",they
>> will be executed serially,and only after task4 finished, scheduler1 will
>> process next record. How can the overlapping of maintenance tasks happen?
>>
>
> When I talk about chained tasks, they are not chained into a single vertex.
>
> They are using the output of the previous task to start the next task, but
> all of them has multiple operators (some of them are with different
> parallelism), which prevents them to got into a single vertex.
>
> So overlapping could happen if a new input triggers a parallel scheduling.
>
> On the other hand, ensure maintenance tasks do not run concurrently by
>> chaing them together is not guaranteed, for there may be case diable the
>> chain. In this case, I think using tags is a better way than lock
>> mechanisms, for simplicity and ease of use for user.
>>
>> Thanks,
>> Wenjin.
>>
>> On 2024/03/30 13:22:12 Péter Váry wrote:
>> > Hi Wenjin,
>> >
>> > See my answers below:
>> >
>> > On Sat, Mar 30, 2024, 10:54 wenjin <we...@gmail.com> wrote:
>> >
>> > > Hi Peter,
>> > >
>> > > I am interested in your proposal and think make iceberg Flink
>> Connector
>> > > support running maintenance task is meaningful . If possible, could
>> you
>> > > help me clarify a few confusions.
>> > >
>> > > - When the iceberg table is written by single Flink job (use case1,
>> 2),the
>> > > maintenance tasks will be added to the post commit topology. How dose
>> the
>> > > maintenance tasks execute? Synchronously or Asynchronously? Will the
>> > > maintenance tasks block the data processing of Flink job?
>> > >
>> >
>> > The sceduling and maintenance tasks are just regular Flink operators.
>> Also
>> > the scheduler will make sure that the maintenance tasks are not chained
>> to
>> > the Iceberg committer, so I would call this Asynchronous.
>> > Flink operators do not block other operators, but the maintenance tasks
>> are
>> > competing for resources with the other data processing tasks. That is
>> why
>> > we provide the possibility to define slot sharing groups for the
>> > maintenance tasks. This allows the users to separate the provided
>> resources
>> > as much as Flink allows.
>> >
>> > I have seen only one exception to this separation where we emit high
>> number
>> > of records in the maintenance flow, which would cause delays in starting
>> > the checpoints, but it could be mitigated by enabling unaligned
>> > checkpoints, and using AsyncIO. There is one issue with AsynIO found by
>> > Gyula Fora: https://issues.apache.org/jira/browse/FLINK-34704 which
>> means,
>> > even with AsyncIO the checkpoint could be blocked until at least one
>> > compaction group is finished.
>> >
>> > - When the iceberg table is written by multi Flink jobs (use case 3),
>> user
>> > > need to create a separate Flink job to run the maintenance task. In
>> this
>> > > case, if user do not create a single job, but enable run maintenance
>> task
>> > > in exist Flink jobs just like use case 1, what would be the
>> consequences?
>> > > Or, is there an automatic mechanism to avoid this issue?
>> > >
>> >
>> > The user needs to create a new job, or chose a single job to run the
>> > maintenance tasks to avoid running concurrent instances of the
>> compaction
>> > tasks.
>> > Even if concurrent compaction tasks could be handled, they would be a
>> > serious waste of resources and increase the likelihood of failing tasks
>> due
>> > to concurrent changes on the table. So we do not plan to support this
>> ATM.
>> >
>> > About the difference of the 2 scheduling method:
>> > - In case 1-2, the scheduling information is coming from the Iceberg
>> > committer - this is working for a single writer.
>> > - In case 3, the scheduling information is coming from the monitor -
>> this
>> > is working for any numbers of writers.
>> >
>> > So even if the maintenance tasks are run in one of the jobs, when there
>> are
>> > multiple writers, the scheduling should be based on monitoring the
>> changes
>> > on the table, instead of the information coming from the committer
>> (which
>> > could only contain the changes only from a single writer)
>> >
>> > I hope this helps,
>> > Peter
>> >
>> >
>> > > Thank you.
>> > >
>> > > Best,
>> > > Wenjin
>> > >
>> > > On 2024/03/28 17:59:49 Péter Váry wrote:
>> > > > Hi Team,
>> > > >
>> > > > I am working on adding a possibility to the Flink Iceberg connector
>> to
>> > > run
>> > > > maintenance tasks on the Iceberg tables. This will fix the small
>> files
>> > > > issues and in the long run help compacting the high number of
>> positional
>> > > > and equality deletes created by Flink tasks writing CDC data to
>> Iceberg
>> > > > tables without the need of Spark in the infrastructure.
>> > > >
>> > > > I did some planning, prototyping and currently trying out the
>> solution
>> > > on a
>> > > > larger scale.
>> > > >
>> > > > I put together a document how my current solution looks like:
>> > > >
>> > >
>> https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit?usp=sharing
>> > > <
>> > >
>> https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit?usp=sharing
>> > > >
>> > > >
>> > > > I would love to hear your thoughts and feedback on this to find a
>> good
>> > > > final solution.
>> > > >
>> > > > Thanks,
>> > > > Peter
>> > > >
>> >
>
>

Reply via email to