Hi Peter,

Are you proposing to create a user facing locking feature in Iceberg, or
> just something something for internal use?
>

Since it's a general issue, I'm proposing to create a general user
interface first, while the implementation can be left to users. For
example, we use Airflow to schedule maintenance jobs and we can check
in-progress jobs with the Airflow API. Hive metastore lock might be another
option we can implement for users.

Thanks,
Manu

On Tue, Apr 2, 2024 at 5:26 AM Péter Váry <peter.vary.apa...@gmail.com>
wrote:

> Hi Ajantha,
>
> I thought about enabling post commit topology based compaction for sinks
> using options, like we use for the parametrization of streaming reads [1].
> I think it will be hard to do it in a user friendly way - because of the
> high number of parameters -, but I think it is a possible solution with
> sensible defaults.
>
> There is a batch-like solution for data file compaction already available
> [2], but I do not see how we could extend Flink SQL to be able to call it.
>
> Writing to a branch using Flink SQL should be another thread, but by my
> first guess, it shouldn't be hard to implement using options, like:
> /*+ OPTIONS('branch'='b1') */
> Since writing to branch i already working through the Java API [3].
>
> Thanks, Peter
>
> 1 -
> https://iceberg.apache.org/docs/latest/flink-queries/#flink-streaming-read
> 2 -
> https://github.com/apache/iceberg/blob/820fc3ceda386149f42db8b54e6db9171d1a3a6d/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java
> 3 -
> https://iceberg.apache.org/docs/latest/flink-writes/#branch-writes
>
> On Mon, Apr 1, 2024, 16:30 Ajantha Bhat <ajanthab...@gmail.com> wrote:
>
>> Thanks for the proposal Peter.
>>
>> I just wanted to know do we have any plans for supporting SQL syntax for
>> table maintenance (like CALL procedure) for pure Flink SQL users?
>> I didn't see any custom SQL parser plugin support in Flink. I also saw
>> that Branch write doesn't have SQL support (only Branch reads use Option),
>> So I am not sure about the roadmap of Iceberg SQL support in Flink.
>> Was there any discussion before?
>>
>> - Ajantha
>>
>> On Mon, Apr 1, 2024 at 7:51 PM Péter Váry <peter.vary.apa...@gmail.com>
>> wrote:
>>
>>> Hi Manu,
>>>
>>> Just to clarify:
>>> - Are you proposing to create a user facing locking feature in Iceberg,
>>> or just something something for internal use?
>>>
>>> I think we shouldn't add locking to Iceberg's user facing scope in this
>>> stage. A fully featured locking system has many more features that we need
>>> (priorities, fairness, timeouts etc). I could be tempted when we are
>>> talking about the REST catalog, but I think that should be further down the
>>> road, if ever...
>>>
>>> About using the tags:
>>> - I whole-heartedly agree that using tags is not intuitive, and I see
>>> your points in most of your arguments. OTOH, introducing new requirement
>>> (locking mechanism) seems like a wrong direction to me.
>>> - We already defined a requirement (atomic changes on the table) for the
>>> Catalog implementations which could be used to archive our goal here.
>>> - We also already store technical data in snapshot properties in Flink
>>> jobs (JobId, OperatorId, CheckpointId). Maybe technical tags/table
>>> properties is not a big stretch.
>>>
>>> Or we can look at these tags or metadata as 'technical data' which is
>>> internal to Iceberg, and shouldn't expressed on the external API. My
>>> concern is:
>>> - Would it be used often enough to worth the additional complexity?
>>>
>>> Knowing that Spark compaction is struggling with the same issue is a
>>> good indicator, but probably we would need more use cases for introducing a
>>> new feature with this complexity, or simpler solution.
>>>
>>> Thanks, Peter
>>>
>>>
>>> On Mon, Apr 1, 2024, 10:18 Manu Zhang <owenzhang1...@gmail.com> wrote:
>>>
>>>> What would the community think of exploiting tags for preventing
>>>>> concurrent maintenance loop executions.
>>>>
>>>>
>>>> This issue is not specific to Flink maintenance jobs. We have a service
>>>> scheduling Spark maintenance jobs by watching table commits. When we don't
>>>> check in-progress maintenance jobs for the same table, multiple jobs will
>>>> run concurrently and have conflicts.
>>>>
>>>> Basically, I think we need a lock mechanism like the metastore lock
>>>> <https://iceberg.apache.org/docs/nightly/configuration/#hadoop-configuration>
>>>> if we want to handle it for users. However, using TAG doesn't look
>>>> intuitive to me. We are also mixing user data with system metadata.
>>>> Maybe we can define some general interfaces and leave the
>>>> implementation to users in the first version.
>>>>
>>>> Regards,
>>>> Manu
>>>>
>>>>
>>>>
>>>> On Fri, Mar 29, 2024 at 1:59 PM Péter Váry <peter.vary.apa...@gmail.com>
>>>> wrote:
>>>>
>>>>> What would the community think of exploiting tags for preventing
>>>>> concurrent maintenance loop executions.
>>>>>
>>>>> The issue:
>>>>> Some maintenance tasks couldn't run parallel, like DeleteOrphanFiles
>>>>> vs. ExpireSnapshots, or RewriteDataFiles vs. RewriteManifestFiles. We make
>>>>> sure, not to run tasks started by a single trigger concurrently by
>>>>> serializing them, but there are no loops in Flink, so we can't synchronize
>>>>> tasks started by the next trigger.
>>>>>
>>>>> In the document, I describe that we need to rely on the user to ensure
>>>>> that the rate limit is high enough to prevent concurrent triggers.
>>>>>
>>>>> Proposal:
>>>>> When firing a trigger, RateLimiter could check and create an Iceberg
>>>>> table tag [1] for the current table snapshot, with the name:
>>>>> '__flink_maitenance'. When the execution finishes we remove this tag. The
>>>>> RateLimiter keep accumulating changes, and doesn't fire new triggers until
>>>>> it finds this tag on the table.
>>>>> The solution relies on Flink 'forceNonParallel' to prevent concurrent
>>>>> execution of placing the tag, and on Iceberg to store it.
>>>>>
>>>>> This not uses the tags as intended, but seems like a better solution
>>>>> than adding/removing table properties which would clutter the table 
>>>>> history
>>>>> with technical data.
>>>>>
>>>>> Your thoughts? Any other suggestions/solutions would be welcome.
>>>>>
>>>>> Thanks,
>>>>> Peter
>>>>>
>>>>> [1]
>>>>> https://iceberg.apache.org/docs/latest/java-api-quickstart/#branching-and-tagging
>>>>>
>>>>> On Thu, Mar 28, 2024, 14:44 Péter Váry <peter.vary.apa...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Team,
>>>>>>
>>>>>> As discussed on yesterday's community sync, I am working on adding a
>>>>>> possibility to the Flink Iceberg connector to run maintenance tasks on 
>>>>>> the
>>>>>> Iceberg tables. This will fix the small files issues and in the long run
>>>>>> help compacting the high number of positional and equality deletes 
>>>>>> created
>>>>>> by Flink tasks writing CDC data to Iceberg tables without the need of 
>>>>>> Spark
>>>>>> in the infrastructure.
>>>>>>
>>>>>> I did some planning, prototyping and currently trying out the
>>>>>> solution on a larger scale.
>>>>>>
>>>>>> I put together a document how my current solution looks like:
>>>>>>
>>>>>> https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit?usp=sharing
>>>>>>
>>>>>> I would love to hear your thoughts and feedback on this to find a
>>>>>> good final solution.
>>>>>>
>>>>>> Thanks,
>>>>>> Peter
>>>>>>
>>>>>

Reply via email to