Hi Vinoth,

I just want to make sure my issue was clear - it seems like Spark shouldn’t be 
requiring a precombined field (or checking that it exists) when dropping 
partitions.

Thanks,

— Ken


> On Apr 4, 2023, at 7:31 AM, Vinoth Chandar <vin...@apache.org> wrote:
> 
> Thanks for raising this issue.
> 
> Love to use this opp to share more context on why the preCombine field
> exists.
> 
>   - As you probably inferred already, we needed to eliminate duplicates,
>   while dealing with out-of-order data (e.g database change records arriving
>   in different orders from two Kafka clusters in two zones). So it was
>   necessary to preCombine by a "event" field, rather than just the arrival
>   time (which is what _hoodie_commit_time is).
>   - This comes from stream processing concepts like
>   https://www.oreilly.com/radar/the-world-beyond-batch-streaming-101/ ,
>   which build upon inadequacies in traditional database systems to deal with
>   things like this. At the end of the day, we are solving a "processing"
>   problem IMO with Hudi - Hudi replaces existing batch/streaming pipelines,
>   not OLTP databases. That's at-least the lens we approached it from.
>   - For this to work end-end, it is not sufficient to just precombine
>   within a batch of incoming writes, we also need to consistently apply the
>   same against data in storage. In CoW, we implicitly merge against storage,
>   so its simpler. But for MoR, we simply append records to log files, so we
>   needed to make this a table property - such that queries/compaction can
>   later do the right preCombine. Hope that clarifies the CoW vs MoR
>   differences.
> 
> On the issues raised/proposals here.
> 
>   1. I think we need some dedicated efforts across the different writer
>   paths to make it easier. probably some lower hanging fruits here. Some of
>   it results from just different authors contributing to different code paths
>   in an OSS project.
>   2. On picking a sane default precombine field. _hoodie_commit_time is a
>   good candidate for preCombine field, as you point out, we would just pick
>   1/many records with the same key arbitrarily, in that scenario. On
>   storage/across commits, we would pick the value with the latest
>   commit_time/last writer wins - which would make queries repeatedly provide
>   the same consistent values as well.  Needs more thought.
>   3. If the user desires to customize this behavior, they could supply a
>   preCombine field that is different. This would be similar to semantics of
>   event time vs arrival order processing in streaming systems. Personally, I
>   need to spend a bit more time digging to come up with an elegant solution
>   here.
>   4. For the proposals on how Hudi could de-duplicate, after the fact that
>   inserts introduced duplicates - I think the current behavior is a bit more
>   condoning than what I'd like tbh. It updates both the records IIRC. I think
>   Hudi should ensure record key uniqueness across different paths and fail
>   the write if it's violated. - if we think of this as in RDBMS lens, that's
>   what would happen, correct?
> 
> 
> Love to hear your thoughts. If we can file a JIRA or compile JIRAs with
> issues around this, we could discuss out short, long term plans?
> 
> Thanks
> Vinoth
> 
> On Sat, Apr 1, 2023 at 3:13 PM Ken Krugler <kkrugler_li...@transpac.com>
> wrote:
> 
>> Hi Daniel,
>> 
>> Thanks for the detailed write-up.
>> 
>> I can’t add much to the discussion, other than noting we also recently ran
>> into the related oddity that we don’t need to define a precombine when
>> writing data to a COW table (using Flink), but then trying to use Spark to
>> drop partitions failed because there’s a default precombine field name (set
>> to “ts”), and if that field doesn’t exist then the Spark job fails.
>> 
>> — Ken
>> 
>> 
>>> On Mar 31, 2023, at 1:20 PM, Daniel Kaźmirski <d.kazmir...@gmail.com>
>> wrote:
>>> 
>>> Hi all,
>>> 
>>> I would like to bring up the topic of how precombine field is used and
>>> what's the purpose of it. I would also like to know what are the plans
>> for
>>> it in the future.
>>> 
>>> At first glance precombine filed looks like it's only used to deduplicate
>>> records in incoming batch.
>>> But when digging deeper it looks like it can/is also be used to:
>>> 1. combine records not before but on write to decide if update existing
>>> record (eg with DefaultHoodieRecordPayload)
>>> 2. combine records on read for MoR table to combine log and base files
>>> correctly.
>>> 3. precombine field is required for spark SQL UPDATE, even if user can't
>>> provide duplicates anyways with this sql statement.
>>> 
>>> Regarding [3] there's inconsistency as precombine field is not required
>> in
>>> MERGE INTO UPDATE. Underneath UPSERT is switched to INSERT in upsert mode
>>> to update existing records.
>>> 
>>> I know that Hudi does a lot of work to ensure PK uniqueness across/within
>>> partitions and there is a need to deduplicate records before write or to
>>> deduplicate existing data if duplicates were introduced eg when using
>>> non-strict insert mode.
>>> 
>>> What should then happen in a situation where user does not want or can
>> not
>>> provide a pre-combine field? Then it's on user not to introduce
>> duplicates,
>>> but makes Hudi more generic and easier to use for "SQL" people.
>>> 
>>> No precombine is possible for CoW, already, but UPSERT and SQL UPDATE is
>>> not supported (but users can update records using Insert in non-strict
>> mode
>>> or MERGE INTO UPDATE).
>>> There's also a difference between CoW and MoR where for MoR
>>> precombine field is a hard requirement, but is optional for CoW.
>>> (UPDATES with no precombine are also possible in Flink for both CoW and
>> MoR
>>> but not in Spark.)
>>> 
>>> Would it make sense to take inspiration from some DBMS systems then (eg
>>> Synapse) to allow updates and upserts when no precombine field is
>> specified?
>>> Scenario:
>>> Say that duplicates were introduced with Insert in non-strict mode, no
>>> precombine field is specified, then we have two options:
>>> option 1) on UPDATE/UPSERT Hudi should deduplicate the existing records,
>> as
>>> there's no precombine field it's expected we don't know which records
>> will
>>> be removed and which will be effectively updated and preserved in the
>>> table. (This can be also achieved by always providing the same value in
>>> precombine field for all records.)
>>> option 2) on UPDATE/UPSERT Hudi should deduplicate the existing records,
>> as
>>> there's no precombine field, record with the latest _hoodie_commit_time
>> is
>>> preserved and updated, other records with the same PK are removed.
>>> 
>>> In both cases, deduplication on UPDATE/UPSERT becomes a hard rule
>>> whether we use precombine field or not.
>>> 
>>> Then regarding MoR and merging records on read (found this in Hudi format
>>> spec), can it be done by only using _hoodie_commit_time in absence of
>>> precombine field?
>>> If so for both MoR and CoW precombine field can become completely
>> optional?
>>> 
>>> I'm of course looking at it more from the user perspective, it would be
>>> nice to know what is and what is not possible from the design and
>> developer
>>> perspective.
>>> 
>>> Best Regards,
>>> Daniel Kaźmirski
>> 
>> --------------------------
>> Ken Krugler
>> http://www.scaleunlimited.com
>> Custom big data solutions
>> Flink, Pinot, Solr, Elasticsearch
>> 
>> 
>> 
>> 

--------------------------
Ken Krugler
http://www.scaleunlimited.com
Custom big data solutions
Flink, Pinot, Solr, Elasticsearch



Reply via email to