Hi Vinoth,

Thanks for your reply!

Regarding the first part, I agree that precombine solves a lot of issues,
especially during the ingestion.
I think this is a valid behavior and should be preserved so that we can
enjoy out-of-order events and duplicates handled by the framework.
I'm also aware there are many use-cases people want to solve with Hudi and
it ranges from  "i don't need precombine field" to "i need many precombine
fields" or "my deduplication logic is complex and I just need to do it
ourselves before write and outside Hudi".

Maybe I should share my usecase to show where this topic comes from and
what's my point of view.
I use precombine myself for handling events delivered from various
applications that are delivered using kafka. These are stored in Hudi table.
Then once these are stored, I have a nice deduplicated dataset that I can
use to create derived tables (App -> Kafka -> Hudi table "raw" -> n derived
Hudi tables). Then in the second part (hudi raw -> hudi derived) I can
safely assume there will be no duplicates and out-of-order events coming
from the first Hudi table, therefore I don't really need precombine field
and it makes modeling this derived table easier as I don't need to consider
how precombine will behave. At the same time, it does not make sense to
introduce another tool and grow my stack for it if Hudi can handle it.

Sometimes I just don't have any field in the source system/table that can
be used as precombine field and am forced to provide Hudi it with something
like "ingestion/processing timestamp" or some constant.


Regarding the proposal comments, I agree overall and understand that these
are not easy choices to make.
>From a user perspective, it would be great to have coherent APIs with
consistent behavior and no surprises. I agree with Ken, there
are operations where it does not feel like precombine should be needed (or
should be something internal and abstracted away from the user).
You're right Vinoth and I was wrong, the update does not deduplicate
existing records, as I checked it will rather get the latest record based
on percombine field, update values, and replace duplicates with it.
Regarding the RDBMS lense, some systems introduce PK NOT ENFORCED mode to
allow duplicates on insert, but always deduplicate on update. If users want
to preserve duplicates but also want to update values, they need to delete
and insert them again on their own. This is a clean way to do it imo, but
is opinionated and some may dislike it, at the same PK uniqueness and
deduplication is at Hudi core principles and is a differentiator.
This article from MS Synapse has nice examples around this:
https://learn.microsoft.com/en-us/azure/synapse-analytics/sql-data-warehouse/sql-data-warehouse-table-constraints
I myself have cases where I care about not missing any data and decide to
store duplicates and handle these later. I think right now Hudi gives us
nice control here with insert modes. If one thinks Hudi should fail on
write, then strict insert mode does the trick but is not a default.


I gathered some JIRA tickets related to precombine/no precombine model:
https://issues.apache.org/jira/browse/HUDI-2633
https://issues.apache.org/jira/browse/HUDI-4701
https://issues.apache.org/jira/browse/HUDI-5848
https://issues.apache.org/jira/browse/HUDI-2681

But maybe you'd like to have a new ticket for this work?
I'm always happy to help.

BR,
Daniel

śr., 5 kwi 2023 o 18:59 Ken Krugler <kkrugler_li...@transpac.com>
napisał(a):

> Hi Vinoth,
>
> I just want to make sure my issue was clear - it seems like Spark
> shouldn’t be requiring a precombined field (or checking that it exists)
> when dropping partitions.
>
> Thanks,
>
> — Ken
>
>
> > On Apr 4, 2023, at 7:31 AM, Vinoth Chandar <vin...@apache.org> wrote:
> >
> > Thanks for raising this issue.
> >
> > Love to use this opp to share more context on why the preCombine field
> > exists.
> >
> >   - As you probably inferred already, we needed to eliminate duplicates,
> >   while dealing with out-of-order data (e.g database change records
> arriving
> >   in different orders from two Kafka clusters in two zones). So it was
> >   necessary to preCombine by a "event" field, rather than just the
> arrival
> >   time (which is what _hoodie_commit_time is).
> >   - This comes from stream processing concepts like
> >   https://www.oreilly.com/radar/the-world-beyond-batch-streaming-101/ ,
> >   which build upon inadequacies in traditional database systems to deal
> with
> >   things like this. At the end of the day, we are solving a "processing"
> >   problem IMO with Hudi - Hudi replaces existing batch/streaming
> pipelines,
> >   not OLTP databases. That's at-least the lens we approached it from.
> >   - For this to work end-end, it is not sufficient to just precombine
> >   within a batch of incoming writes, we also need to consistently apply
> the
> >   same against data in storage. In CoW, we implicitly merge against
> storage,
> >   so its simpler. But for MoR, we simply append records to log files, so
> we
> >   needed to make this a table property - such that queries/compaction can
> >   later do the right preCombine. Hope that clarifies the CoW vs MoR
> >   differences.
> >
> > On the issues raised/proposals here.
> >
> >   1. I think we need some dedicated efforts across the different writer
> >   paths to make it easier. probably some lower hanging fruits here. Some
> of
> >   it results from just different authors contributing to different code
> paths
> >   in an OSS project.
> >   2. On picking a sane default precombine field. _hoodie_commit_time is a
> >   good candidate for preCombine field, as you point out, we would just
> pick
> >   1/many records with the same key arbitrarily, in that scenario. On
> >   storage/across commits, we would pick the value with the latest
> >   commit_time/last writer wins - which would make queries repeatedly
> provide
> >   the same consistent values as well.  Needs more thought.
> >   3. If the user desires to customize this behavior, they could supply a
> >   preCombine field that is different. This would be similar to semantics
> of
> >   event time vs arrival order processing in streaming systems.
> Personally, I
> >   need to spend a bit more time digging to come up with an elegant
> solution
> >   here.
> >   4. For the proposals on how Hudi could de-duplicate, after the fact
> that
> >   inserts introduced duplicates - I think the current behavior is a bit
> more
> >   condoning than what I'd like tbh. It updates both the records IIRC. I
> think
> >   Hudi should ensure record key uniqueness across different paths and
> fail
> >   the write if it's violated. - if we think of this as in RDBMS lens,
> that's
> >   what would happen, correct?
> >
> >
> > Love to hear your thoughts. If we can file a JIRA or compile JIRAs with
> > issues around this, we could discuss out short, long term plans?
> >
> > Thanks
> > Vinoth
> >
> > On Sat, Apr 1, 2023 at 3:13 PM Ken Krugler <kkrugler_li...@transpac.com>
> > wrote:
> >
> >> Hi Daniel,
> >>
> >> Thanks for the detailed write-up.
> >>
> >> I can’t add much to the discussion, other than noting we also recently
> ran
> >> into the related oddity that we don’t need to define a precombine when
> >> writing data to a COW table (using Flink), but then trying to use Spark
> to
> >> drop partitions failed because there’s a default precombine field name
> (set
> >> to “ts”), and if that field doesn’t exist then the Spark job fails.
> >>
> >> — Ken
> >>
> >>
> >>> On Mar 31, 2023, at 1:20 PM, Daniel Kaźmirski <d.kazmir...@gmail.com>
> >> wrote:
> >>>
> >>> Hi all,
> >>>
> >>> I would like to bring up the topic of how precombine field is used and
> >>> what's the purpose of it. I would also like to know what are the plans
> >> for
> >>> it in the future.
> >>>
> >>> At first glance precombine filed looks like it's only used to
> deduplicate
> >>> records in incoming batch.
> >>> But when digging deeper it looks like it can/is also be used to:
> >>> 1. combine records not before but on write to decide if update existing
> >>> record (eg with DefaultHoodieRecordPayload)
> >>> 2. combine records on read for MoR table to combine log and base files
> >>> correctly.
> >>> 3. precombine field is required for spark SQL UPDATE, even if user
> can't
> >>> provide duplicates anyways with this sql statement.
> >>>
> >>> Regarding [3] there's inconsistency as precombine field is not required
> >> in
> >>> MERGE INTO UPDATE. Underneath UPSERT is switched to INSERT in upsert
> mode
> >>> to update existing records.
> >>>
> >>> I know that Hudi does a lot of work to ensure PK uniqueness
> across/within
> >>> partitions and there is a need to deduplicate records before write or
> to
> >>> deduplicate existing data if duplicates were introduced eg when using
> >>> non-strict insert mode.
> >>>
> >>> What should then happen in a situation where user does not want or can
> >> not
> >>> provide a pre-combine field? Then it's on user not to introduce
> >> duplicates,
> >>> but makes Hudi more generic and easier to use for "SQL" people.
> >>>
> >>> No precombine is possible for CoW, already, but UPSERT and SQL UPDATE
> is
> >>> not supported (but users can update records using Insert in non-strict
> >> mode
> >>> or MERGE INTO UPDATE).
> >>> There's also a difference between CoW and MoR where for MoR
> >>> precombine field is a hard requirement, but is optional for CoW.
> >>> (UPDATES with no precombine are also possible in Flink for both CoW and
> >> MoR
> >>> but not in Spark.)
> >>>
> >>> Would it make sense to take inspiration from some DBMS systems then (eg
> >>> Synapse) to allow updates and upserts when no precombine field is
> >> specified?
> >>> Scenario:
> >>> Say that duplicates were introduced with Insert in non-strict mode, no
> >>> precombine field is specified, then we have two options:
> >>> option 1) on UPDATE/UPSERT Hudi should deduplicate the existing
> records,
> >> as
> >>> there's no precombine field it's expected we don't know which records
> >> will
> >>> be removed and which will be effectively updated and preserved in the
> >>> table. (This can be also achieved by always providing the same value in
> >>> precombine field for all records.)
> >>> option 2) on UPDATE/UPSERT Hudi should deduplicate the existing
> records,
> >> as
> >>> there's no precombine field, record with the latest _hoodie_commit_time
> >> is
> >>> preserved and updated, other records with the same PK are removed.
> >>>
> >>> In both cases, deduplication on UPDATE/UPSERT becomes a hard rule
> >>> whether we use precombine field or not.
> >>>
> >>> Then regarding MoR and merging records on read (found this in Hudi
> format
> >>> spec), can it be done by only using _hoodie_commit_time in absence of
> >>> precombine field?
> >>> If so for both MoR and CoW precombine field can become completely
> >> optional?
> >>>
> >>> I'm of course looking at it more from the user perspective, it would be
> >>> nice to know what is and what is not possible from the design and
> >> developer
> >>> perspective.
> >>>
> >>> Best Regards,
> >>> Daniel Kaźmirski
> >>
> >> --------------------------
> >> Ken Krugler
> >> http://www.scaleunlimited.com
> >> Custom big data solutions
> >> Flink, Pinot, Solr, Elasticsearch
> >>
> >>
> >>
> >>
>
> --------------------------
> Ken Krugler
> http://www.scaleunlimited.com
> Custom big data solutions
> Flink, Pinot, Solr, Elasticsearch
>
>
>
>

-- 
Daniel Kaźmirski

Reply via email to