Hi Felix,

Good to hear from you. So, trying to understand your requirement. If we
sorted all inserts in commit, by some configurable list of fields, while
sending the updates to their respective file groups, would that suffice?

Then you could periodically cluster if you need to change the sort order
more globally across the entire partition/table.

Satish, if this is the case, it can be one more "layout optimization"
strategy which is sort based, as opposed to bucketing/hash modulo grouping.

This is an awesome discussion folks, let keep going :)

Thanks
Vinoth

On Tue, Feb 9, 2021 at 5:44 PM Kizhakkel Jose, Felix
<felix.j...@philips.com.invalid> wrote:

> Hello All,
> I would like to  sort records in each file on COW table by a given key
> while ingesting/writing data - I am using Spark Data source + Kafka
> (Structured Streaming).
> HUDI is doing a great thing of getting each file to the optimal file size,
> (by compaction and appending data to smaller files) so when we get a file
> in its optimal file size the records in that file also should be sorted if
> a sort/order-by column is provided. So I don’t have to do another round of
> processing on the optimized file.
>
> Regards,
> Felix K Jose
> From: Rubens Rodrigues <rubenssoto2...@gmail.com>
> Date: Tuesday, February 9, 2021 at 8:36 PM
> To: dev@hudi.apache.org <dev@hudi.apache.org>
> Subject: Re: [DISCUSS] Improve data locality during ingestion
> Caution: This e-mail originated from outside of Philips, be careful for
> phishing.
>
>
> Hi guys,
>
> Talking about my use case...
>
> I have datasets that ordering data by date makes a lot sense or ordering by
> some id to have less touched files on merge operations.
> On my use of delta lake I used to bootstrap tables ever ordering by one of
> these fields and helps a lot on file pruning.
>
> Hudi clustering do this job but I think it is an unnecessary extra step to
> do after bulk insert because all data will need to be rewrite again.
>
>
>
> Em ter, 9 de fev de 2021 21:53, Vinoth Chandar <vin...@apache.org>
> escreveu:
>
> > Hi Satish,
> >
> > Been to respond to this. I think I like the idea overall.
> >
> > Here's a (hopefully) my understanding version and let me know if I am
> > getting this right.
> >
> > Predominantly, we are just talking about the problem of: where do we send
> > the "inserts" to.
> >
> > Today the upsert partitioner does the file sizing/bin-packing etc for
> > inserts and then sends some inserts over to existing file groups to
> > maintain file size.
> > We can abstract all of this into strategies and some kind of pipeline
> > abstractions and have it also consider "affinity" to an existing file
> group
> > based
> > on say information stored in the metadata table?
> >
> > I think this is complimentary to what we do today and can be helpful.
> First
> > thing may be is to abstract the existing write pipeline as a series of
> > "optimization"
> > stages and bring things like file sizing under that.
> >
> > On bucketing, I am not against Hive bucketing or anything. But with
> record
> > level indexes and granular/micro partitions that we can achieve using
> > clustering, is it still the most efficient design? That's a question I
> > would love to find answers for. I never liked the static/hash
> partitioning
> > based schemes
> > in bucketing. they introduce  a lot of manual data munging, if things
> > change.
> >
> > Thanks
> > Vinoth
> >
> >
> >
> > On Wed, Feb 3, 2021 at 5:17 PM Satish Kotha <satishko...@uber.com.invalid
> >
> > wrote:
> >
> > > I got some feedback that this thread may be a bit complex to
> understand.
> > So
> > > I tried to simplify proposal to below:
> > >
> > > Users can already specify 'partitionpath' using this config
> > > <
> > >
> >
> https://eur01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fhudi.apache.org%2Fdocs%2Fconfigurations.html%23PARTITIONPATH_FIELD_OPT_KEY&amp;data=04%7C01%7C%7C5ccc16d6386a45c743be08d8cd644933%7C1a407a2d76754d178692b3ac285306e4%7C0%7C0%7C637485177900518499%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=luyQHXq8pe2V9ojNuxZJRL60HZBpqoIi%2BjhGjCxvdGs%3D&amp;reserved=0
> > > >
> > > when
> > > writing data. My proposal is we also give users the ability to identify
> > (or
> > > hint at) 'fileId' to while writing the data. For example, users can
> > > say 'locality.columns:
> > > session_id'. We deterministically map every session_id to a specific
> > > fileGroup in hudi (using hash-modulo or range-partitioning etc). So all
> > > values for a session_id are co-located in the same data/log file.
> > >
> > > Hopefully, this explains the idea better. Appreciate any feedback.
> > >
> > > On Mon, Feb 1, 2021 at 3:43 PM Satish Kotha <satishko...@uber.com>
> > wrote:
> > >
> > > > Hello,
> > > >
> > > > Clustering <
> https://eur01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fhudi.apache.org%2Fblog%2Fhudi-clustering-intro%2F&amp;data=04%7C01%7C%7C5ccc16d6386a45c743be08d8cd644933%7C1a407a2d76754d178692b3ac285306e4%7C0%7C0%7C637485177900518499%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=breYLo6tMcfMz%2BhJgS8mU9zbzKHLyCIzd48Cs3iJiCk%3D&amp;reserved=0>
> is a
> > > > great feature for improving data locality. But it has a (relatively
> > big)
> > > > cost to rewrite the data after ingestion. I think there are other
> ways
> > to
> > > > improve data locality during ingestion. For example, we can add a new
> > > Index
> > > > (or partitioner) that reads values for columns that are important
> from
> > a
> > > > data locality perspective. We could then compute hash modulo on the
> > value
> > > > and use that to deterministically identify the file group that the
> > record
> > > > has to be written into.
> > > >
> > > > More detailed example:
> > > > Assume we introduce 2 new config:
> > > > hoodie.datasource.write.num.file.groups: "N" #Controls the total
> number
> > > of
> > > > file Ids allowed (per partition).
> > > >
> > > > hoodie.datasource.write.locality.columns: "session_id,timestamp"
> > > #Identify
> > > > columns that are important for data locality.
> > > >
> > > > (I can come up with better names for config if the general idea
> sounds
> > > > good).
> > > >
> > > > During ingestion, we generate 'N' fileIds for each partition (if that
> > > > partition has already K fileIds, we generate N-K new fileIds). Let's
> > say
> > > > these fileIds are stored in fileIdList data structure. For each row,
> we
> > > > compute 'hash(row.get(session_id)+row.get(timestamp)) % N'.  This
> value
> > > is
> > > > used as the index into fileIdList data structure to deterministically
> > > > identify the file group for the row.
> > > >
> > > > This improves data locality by ensuring columns with a given value
> are
> > > > stored in the same file. This hashing could be done in two places:
> > > > 1) A custom index that tags location for each row based on values for
> > > > 'session_id+timestamp'.
> > > > 2) In a new partitioner that assigns buckets for each row based of
> > values
> > > > for 'session_id+timestamp'
> > > >
> > > > *Advantages:*
> > > > 1) No need to rewrite data for improving data locality.
> > > > 2) Integrates well with hive bucketing (spark is also adding support
> > for
> > > > hive bucketing <
> https://eur01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FSPARK-19256&amp;data=04%7C01%7C%7C5ccc16d6386a45c743be08d8cd644933%7C1a407a2d76754d178692b3ac285306e4%7C0%7C0%7C637485177900518499%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=pYt8dxxnjBi5iwCfZiXffDaDQ2Tv53x4Oxfzo3zzah4%3D&amp;reserved=0
> >)
> > > > 3) This reduces scan cycles to find a particular key because this
> > ensures
> > > > that the key is present in a certain fileId. Similarly, joining
> across
> > > > multiple tables would be efficient if they both choose the same
> > > > 'locality.columns'.
> > > >
> > > > *Disadvantages:*
> > > > 1) Users need to know the total number of filegroups to generate per
> > > > partition. This value is assumed to be static for all partitions. So
> if
> > > > significant changes are expected in traffic volume, this may not
> > > partition
> > > > the data well.  (We could also consider making this static per
> > partition,
> > > > which adds additional complexity, but feasible to do)
> > > > 2) This may not be as efficient as clustering. For example, data for
> a
> > > > given column value is guaranteed to be co-located in the same file.
> > But
> > > > they may not be in the same block (row group in parquet).  So more
> > blocks
> > > > need to be read by query engines.
> > > >
> > > >
> > > > Clustering can still be useful for other use cases such as stitching
> > > > files, transforming data for efficiency etc. Clustering can also be
> > > useful
> > > > for a few sorting scenarios - e.g., if users cannot predict a good
> > value
> > > > for the number of file groups needed.
> > > >
> > > > Appreciate any feedback. Let me know if you have other ideas on
> > improving
> > > > data locality. If you are interested in this idea and want to
> > > collaborate,
> > > > please reach out.
> > > >
> > > > Thanks
> > > > Satish
> > > >
> > >
> >
>
> ________________________________
> The information contained in this message may be confidential and legally
> protected under applicable law. The message is intended solely for the
> addressee(s). If you are not the intended recipient, you are hereby
> notified that any use, forwarding, dissemination, or reproduction of this
> message is strictly prohibited and may be unlawful. If you are not the
> intended recipient, please contact the sender by return e-mail and destroy
> all copies of the original message.
>

Reply via email to