Hi Peter, Are you proposing to create a user facing locking feature in Iceberg, or > just something something for internal use? >
Since it's a general issue, I'm proposing to create a general user interface first, while the implementation can be left to users. For example, we use Airflow to schedule maintenance jobs and we can check in-progress jobs with the Airflow API. Hive metastore lock might be another option we can implement for users. Thanks, Manu On Tue, Apr 2, 2024 at 5:26 AM Péter Váry <peter.vary.apa...@gmail.com> wrote: > Hi Ajantha, > > I thought about enabling post commit topology based compaction for sinks > using options, like we use for the parametrization of streaming reads [1]. > I think it will be hard to do it in a user friendly way - because of the > high number of parameters -, but I think it is a possible solution with > sensible defaults. > > There is a batch-like solution for data file compaction already available > [2], but I do not see how we could extend Flink SQL to be able to call it. > > Writing to a branch using Flink SQL should be another thread, but by my > first guess, it shouldn't be hard to implement using options, like: > /*+ OPTIONS('branch'='b1') */ > Since writing to branch i already working through the Java API [3]. > > Thanks, Peter > > 1 - > https://iceberg.apache.org/docs/latest/flink-queries/#flink-streaming-read > 2 - > https://github.com/apache/iceberg/blob/820fc3ceda386149f42db8b54e6db9171d1a3a6d/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java > 3 - > https://iceberg.apache.org/docs/latest/flink-writes/#branch-writes > > On Mon, Apr 1, 2024, 16:30 Ajantha Bhat <ajanthab...@gmail.com> wrote: > >> Thanks for the proposal Peter. >> >> I just wanted to know do we have any plans for supporting SQL syntax for >> table maintenance (like CALL procedure) for pure Flink SQL users? >> I didn't see any custom SQL parser plugin support in Flink. I also saw >> that Branch write doesn't have SQL support (only Branch reads use Option), >> So I am not sure about the roadmap of Iceberg SQL support in Flink. >> Was there any discussion before? >> >> - Ajantha >> >> On Mon, Apr 1, 2024 at 7:51 PM Péter Váry <peter.vary.apa...@gmail.com> >> wrote: >> >>> Hi Manu, >>> >>> Just to clarify: >>> - Are you proposing to create a user facing locking feature in Iceberg, >>> or just something something for internal use? >>> >>> I think we shouldn't add locking to Iceberg's user facing scope in this >>> stage. A fully featured locking system has many more features that we need >>> (priorities, fairness, timeouts etc). I could be tempted when we are >>> talking about the REST catalog, but I think that should be further down the >>> road, if ever... >>> >>> About using the tags: >>> - I whole-heartedly agree that using tags is not intuitive, and I see >>> your points in most of your arguments. OTOH, introducing new requirement >>> (locking mechanism) seems like a wrong direction to me. >>> - We already defined a requirement (atomic changes on the table) for the >>> Catalog implementations which could be used to archive our goal here. >>> - We also already store technical data in snapshot properties in Flink >>> jobs (JobId, OperatorId, CheckpointId). Maybe technical tags/table >>> properties is not a big stretch. >>> >>> Or we can look at these tags or metadata as 'technical data' which is >>> internal to Iceberg, and shouldn't expressed on the external API. My >>> concern is: >>> - Would it be used often enough to worth the additional complexity? >>> >>> Knowing that Spark compaction is struggling with the same issue is a >>> good indicator, but probably we would need more use cases for introducing a >>> new feature with this complexity, or simpler solution. >>> >>> Thanks, Peter >>> >>> >>> On Mon, Apr 1, 2024, 10:18 Manu Zhang <owenzhang1...@gmail.com> wrote: >>> >>>> What would the community think of exploiting tags for preventing >>>>> concurrent maintenance loop executions. >>>> >>>> >>>> This issue is not specific to Flink maintenance jobs. We have a service >>>> scheduling Spark maintenance jobs by watching table commits. When we don't >>>> check in-progress maintenance jobs for the same table, multiple jobs will >>>> run concurrently and have conflicts. >>>> >>>> Basically, I think we need a lock mechanism like the metastore lock >>>> <https://iceberg.apache.org/docs/nightly/configuration/#hadoop-configuration> >>>> if we want to handle it for users. However, using TAG doesn't look >>>> intuitive to me. We are also mixing user data with system metadata. >>>> Maybe we can define some general interfaces and leave the >>>> implementation to users in the first version. >>>> >>>> Regards, >>>> Manu >>>> >>>> >>>> >>>> On Fri, Mar 29, 2024 at 1:59 PM Péter Váry <peter.vary.apa...@gmail.com> >>>> wrote: >>>> >>>>> What would the community think of exploiting tags for preventing >>>>> concurrent maintenance loop executions. >>>>> >>>>> The issue: >>>>> Some maintenance tasks couldn't run parallel, like DeleteOrphanFiles >>>>> vs. ExpireSnapshots, or RewriteDataFiles vs. RewriteManifestFiles. We make >>>>> sure, not to run tasks started by a single trigger concurrently by >>>>> serializing them, but there are no loops in Flink, so we can't synchronize >>>>> tasks started by the next trigger. >>>>> >>>>> In the document, I describe that we need to rely on the user to ensure >>>>> that the rate limit is high enough to prevent concurrent triggers. >>>>> >>>>> Proposal: >>>>> When firing a trigger, RateLimiter could check and create an Iceberg >>>>> table tag [1] for the current table snapshot, with the name: >>>>> '__flink_maitenance'. When the execution finishes we remove this tag. The >>>>> RateLimiter keep accumulating changes, and doesn't fire new triggers until >>>>> it finds this tag on the table. >>>>> The solution relies on Flink 'forceNonParallel' to prevent concurrent >>>>> execution of placing the tag, and on Iceberg to store it. >>>>> >>>>> This not uses the tags as intended, but seems like a better solution >>>>> than adding/removing table properties which would clutter the table >>>>> history >>>>> with technical data. >>>>> >>>>> Your thoughts? Any other suggestions/solutions would be welcome. >>>>> >>>>> Thanks, >>>>> Peter >>>>> >>>>> [1] >>>>> https://iceberg.apache.org/docs/latest/java-api-quickstart/#branching-and-tagging >>>>> >>>>> On Thu, Mar 28, 2024, 14:44 Péter Váry <peter.vary.apa...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Team, >>>>>> >>>>>> As discussed on yesterday's community sync, I am working on adding a >>>>>> possibility to the Flink Iceberg connector to run maintenance tasks on >>>>>> the >>>>>> Iceberg tables. This will fix the small files issues and in the long run >>>>>> help compacting the high number of positional and equality deletes >>>>>> created >>>>>> by Flink tasks writing CDC data to Iceberg tables without the need of >>>>>> Spark >>>>>> in the infrastructure. >>>>>> >>>>>> I did some planning, prototyping and currently trying out the >>>>>> solution on a larger scale. >>>>>> >>>>>> I put together a document how my current solution looks like: >>>>>> >>>>>> https://docs.google.com/document/d/16g3vR18mVBy8jbFaLjf2JwAANuYOmIwr15yDDxovdnA/edit?usp=sharing >>>>>> >>>>>> I would love to hear your thoughts and feedback on this to find a >>>>>> good final solution. >>>>>> >>>>>> Thanks, >>>>>> Peter >>>>>> >>>>>