Definitely I am not looking for file per sort column/sub partition-column 
because then you will end-up/lose the concept of optimal file sizing – that 
works pretty well with HUDI.

So what I am proposing is whenever we create or update a file, the records 
within that file should be sorted by the given sort key, thereby we could get 
the range pruning much better.

Regards,
Felix K Jose
From: Sivabalan <n.siv...@gmail.com>
Date: Monday, February 15, 2021 at 12:52 PM
To: dev@hudi.apache.org <dev@hudi.apache.org>
Subject: Re: [DISCUSS] Improve data locality during ingestion
Caution: This e-mail originated from outside of Philips, be careful for 
phishing.


Good discussion points.
Basically on a high level, we are looking to propose
sub-partitioning(locality columns as described above) (how to route inserts
within a partition) and an optional ordering(sorting as proposed above)
property for inserts.

For sub-partitioning, guess there will be non-trivial amount of work in
hudi as we have a max size limit for a given data file. So, let's say the
sub-partitioning(/locality column) field is city_id, one datafile may not
suffice to hold entire city records for a given partition for large cities.
# of data files per city_id should grow dynamically as well. Same applies
otherwise too. If there are lot of small cities, may not make sense to have
one file per city_id.

Guess that's why range partitioning was proposed in the thread above.
Anyways, definitely a good addition to hudi for sure.



On Fri, Feb 12, 2021 at 11:53 AM Vinoth Chandar <vin...@apache.org> wrote:

> Makes a lot of sense to add IMO.
>
> Satish, since you proposed this thread. what do you suggest as next steps?
> Does this deserve a RFC?
>
> On Wed, Feb 10, 2021 at 5:00 AM Kizhakkel Jose, Felix
> <felix.j...@philips.com.invalid> wrote:
>
> > Hi Vinoth,
> >
> > Yes that’s exactly what I would like to have. Thank you for seeking
> > clarification.
> >
> > Regards,
> > Felix K Jose
> > From: Vinoth Chandar <vin...@apache.org>
> > Date: Tuesday, February 9, 2021 at 10:44 PM
> > To: dev <dev@hudi.apache.org>
> > Subject: Re: [DISCUSS] Improve data locality during ingestion
> > Caution: This e-mail originated from outside of Philips, be careful for
> > phishing.
> >
> >
> > Hi Felix,
> >
> > Good to hear from you. So, trying to understand your requirement. If we
> > sorted all inserts in commit, by some configurable list of fields, while
> > sending the updates to their respective file groups, would that suffice?
> >
> > Then you could periodically cluster if you need to change the sort order
> > more globally across the entire partition/table.
> >
> > Satish, if this is the case, it can be one more "layout optimization"
> > strategy which is sort based, as opposed to bucketing/hash modulo
> grouping.
> >
> > This is an awesome discussion folks, let keep going :)
> >
> > Thanks
> > Vinoth
> >
> > On Tue, Feb 9, 2021 at 5:44 PM Kizhakkel Jose, Felix
> > <felix.j...@philips.com.invalid> wrote:
> >
> > > Hello All,
> > > I would like to  sort records in each file on COW table by a given key
> > > while ingesting/writing data - I am using Spark Data source + Kafka
> > > (Structured Streaming).
> > > HUDI is doing a great thing of getting each file to the optimal file
> > size,
> > > (by compaction and appending data to smaller files) so when we get a
> file
> > > in its optimal file size the records in that file also should be sorted
> > if
> > > a sort/order-by column is provided. So I don’t have to do another round
> > of
> > > processing on the optimized file.
> > >
> > > Regards,
> > > Felix K Jose
> > > From: Rubens Rodrigues <rubenssoto2...@gmail.com>
> > > Date: Tuesday, February 9, 2021 at 8:36 PM
> > > To: dev@hudi.apache.org <dev@hudi.apache.org>
> > > Subject: Re: [DISCUSS] Improve data locality during ingestion
> > > Caution: This e-mail originated from outside of Philips, be careful for
> > > phishing.
> > >
> > >
> > > Hi guys,
> > >
> > > Talking about my use case...
> > >
> > > I have datasets that ordering data by date makes a lot sense or
> ordering
> > by
> > > some id to have less touched files on merge operations.
> > > On my use of delta lake I used to bootstrap tables ever ordering by one
> > of
> > > these fields and helps a lot on file pruning.
> > >
> > > Hudi clustering do this job but I think it is an unnecessary extra step
> > to
> > > do after bulk insert because all data will need to be rewrite again.
> > >
> > >
> > >
> > > Em ter, 9 de fev de 2021 21:53, Vinoth Chandar <vin...@apache.org>
> > > escreveu:
> > >
> > > > Hi Satish,
> > > >
> > > > Been to respond to this. I think I like the idea overall.
> > > >
> > > > Here's a (hopefully) my understanding version and let me know if I am
> > > > getting this right.
> > > >
> > > > Predominantly, we are just talking about the problem of: where do we
> > send
> > > > the "inserts" to.
> > > >
> > > > Today the upsert partitioner does the file sizing/bin-packing etc for
> > > > inserts and then sends some inserts over to existing file groups to
> > > > maintain file size.
> > > > We can abstract all of this into strategies and some kind of pipeline
> > > > abstractions and have it also consider "affinity" to an existing file
> > > group
> > > > based
> > > > on say information stored in the metadata table?
> > > >
> > > > I think this is complimentary to what we do today and can be helpful.
> > > First
> > > > thing may be is to abstract the existing write pipeline as a series
> of
> > > > "optimization"
> > > > stages and bring things like file sizing under that.
> > > >
> > > > On bucketing, I am not against Hive bucketing or anything. But with
> > > record
> > > > level indexes and granular/micro partitions that we can achieve using
> > > > clustering, is it still the most efficient design? That's a question
> I
> > > > would love to find answers for. I never liked the static/hash
> > > partitioning
> > > > based schemes
> > > > in bucketing. they introduce  a lot of manual data munging, if things
> > > > change.
> > > >
> > > > Thanks
> > > > Vinoth
> > > >
> > > >
> > > >
> > > > On Wed, Feb 3, 2021 at 5:17 PM Satish Kotha
> > <satishko...@uber.com.invalid
> > > >
> > > > wrote:
> > > >
> > > > > I got some feedback that this thread may be a bit complex to
> > > understand.
> > > > So
> > > > > I tried to simplify proposal to below:
> > > > >
> > > > > Users can already specify 'partitionpath' using this config
> > > > > <
> > > > >
> > > >
> > >
> >
> https://eur01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fhudi.apache.org%2Fdocs%2Fconfigurations.html%23PARTITIONPATH_FIELD_OPT_KEY&amp;data=04%7C01%7C%7C67a13099687b45a3b1da08d8d1da73ca%7C1a407a2d76754d178692b3ac285306e4%7C0%7C0%7C637490083477056265%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=wE0mMJbu4mfIehtJewBPWGu7%2FoYwllPIISv7ElxAbNM%3D&amp;reserved=0
> > > > > >
> > > > > when
> > > > > writing data. My proposal is we also give users the ability to
> > identify
> > > > (or
> > > > > hint at) 'fileId' to while writing the data. For example, users can
> > > > > say 'locality.columns:
> > > > > session_id'. We deterministically map every session_id to a
> specific
> > > > > fileGroup in hudi (using hash-modulo or range-partitioning etc). So
> > all
> > > > > values for a session_id are co-located in the same data/log file.
> > > > >
> > > > > Hopefully, this explains the idea better. Appreciate any feedback.
> > > > >
> > > > > On Mon, Feb 1, 2021 at 3:43 PM Satish Kotha <satishko...@uber.com>
> > > > wrote:
> > > > >
> > > > > > Hello,
> > > > > >
> > > > > > Clustering <
> > >
> >
> https://eur01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fhudi.apache.org%2Fblog%2Fhudi-clustering-intro%2F&amp;data=04%7C01%7C%7C67a13099687b45a3b1da08d8d1da73ca%7C1a407a2d76754d178692b3ac285306e4%7C0%7C0%7C637490083477056265%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=L3CTXS%2F8sPJbn%2F71n%2FL%2B8gfp%2BAwoadEwLoGOZs1TZZM%3D&amp;reserved=0
> > >
> > > is a
> > > > > > great feature for improving data locality. But it has a
> (relatively
> > > > big)
> > > > > > cost to rewrite the data after ingestion. I think there are other
> > > ways
> > > > to
> > > > > > improve data locality during ingestion. For example, we can add a
> > new
> > > > > Index
> > > > > > (or partitioner) that reads values for columns that are important
> > > from
> > > > a
> > > > > > data locality perspective. We could then compute hash modulo on
> the
> > > > value
> > > > > > and use that to deterministically identify the file group that
> the
> > > > record
> > > > > > has to be written into.
> > > > > >
> > > > > > More detailed example:
> > > > > > Assume we introduce 2 new config:
> > > > > > hoodie.datasource.write.num.file.groups: "N" #Controls the total
> > > number
> > > > > of
> > > > > > file Ids allowed (per partition).
> > > > > >
> > > > > > hoodie.datasource.write.locality.columns: "session_id,timestamp"
> > > > > #Identify
> > > > > > columns that are important for data locality.
> > > > > >
> > > > > > (I can come up with better names for config if the general idea
> > > sounds
> > > > > > good).
> > > > > >
> > > > > > During ingestion, we generate 'N' fileIds for each partition (if
> > that
> > > > > > partition has already K fileIds, we generate N-K new fileIds).
> > Let's
> > > > say
> > > > > > these fileIds are stored in fileIdList data structure. For each
> > row,
> > > we
> > > > > > compute 'hash(row.get(session_id)+row.get(timestamp)) % N'.  This
> > > value
> > > > > is
> > > > > > used as the index into fileIdList data structure to
> > deterministically
> > > > > > identify the file group for the row.
> > > > > >
> > > > > > This improves data locality by ensuring columns with a given
> value
> > > are
> > > > > > stored in the same file. This hashing could be done in two
> places:
> > > > > > 1) A custom index that tags location for each row based on values
> > for
> > > > > > 'session_id+timestamp'.
> > > > > > 2) In a new partitioner that assigns buckets for each row based
> of
> > > > values
> > > > > > for 'session_id+timestamp'
> > > > > >
> > > > > > *Advantages:*
> > > > > > 1) No need to rewrite data for improving data locality.
> > > > > > 2) Integrates well with hive bucketing (spark is also adding
> > support
> > > > for
> > > > > > hive bucketing <
> > >
> >
> https://eur01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FSPARK-19256&amp;data=04%7C01%7C%7C67a13099687b45a3b1da08d8d1da73ca%7C1a407a2d76754d178692b3ac285306e4%7C0%7C0%7C637490083477056265%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=%2B2qYXz8dDuFg2wcW2cA0zB39IyU1SmqMt58nwIHrbRE%3D&amp;reserved=0
> > > >)
> > > > > > 3) This reduces scan cycles to find a particular key because this
> > > > ensures
> > > > > > that the key is present in a certain fileId. Similarly, joining
> > > across
> > > > > > multiple tables would be efficient if they both choose the same
> > > > > > 'locality.columns'.
> > > > > >
> > > > > > *Disadvantages:*
> > > > > > 1) Users need to know the total number of filegroups to generate
> > per
> > > > > > partition. This value is assumed to be static for all partitions.
> > So
> > > if
> > > > > > significant changes are expected in traffic volume, this may not
> > > > > partition
> > > > > > the data well.  (We could also consider making this static per
> > > > partition,
> > > > > > which adds additional complexity, but feasible to do)
> > > > > > 2) This may not be as efficient as clustering. For example, data
> > for
> > > a
> > > > > > given column value is guaranteed to be co-located in the same
> file.
> > > > But
> > > > > > they may not be in the same block (row group in parquet).  So
> more
> > > > blocks
> > > > > > need to be read by query engines.
> > > > > >
> > > > > >
> > > > > > Clustering can still be useful for other use cases such as
> > stitching
> > > > > > files, transforming data for efficiency etc. Clustering can also
> be
> > > > > useful
> > > > > > for a few sorting scenarios - e.g., if users cannot predict a
> good
> > > > value
> > > > > > for the number of file groups needed.
> > > > > >
> > > > > > Appreciate any feedback. Let me know if you have other ideas on
> > > > improving
> > > > > > data locality. If you are interested in this idea and want to
> > > > > collaborate,
> > > > > > please reach out.
> > > > > >
> > > > > > Thanks
> > > > > > Satish
> > > > > >
> > > > >
> > > >
> > >
> > > ________________________________
> > > The information contained in this message may be confidential and
> legally
> > > protected under applicable law. The message is intended solely for the
> > > addressee(s). If you are not the intended recipient, you are hereby
> > > notified that any use, forwarding, dissemination, or reproduction of
> this
> > > message is strictly prohibited and may be unlawful. If you are not the
> > > intended recipient, please contact the sender by return e-mail and
> > destroy
> > > all copies of the original message.
> > >
> >
> > ________________________________
> > The information contained in this message may be confidential and legally
> > protected under applicable law. The message is intended solely for the
> > addressee(s). If you are not the intended recipient, you are hereby
> > notified that any use, forwarding, dissemination, or reproduction of this
> > message is strictly prohibited and may be unlawful. If you are not the
> > intended recipient, please contact the sender by return e-mail and
> destroy
> > all copies of the original message.
> >
>


--
Regards,
-Sivabalan

________________________________
The information contained in this message may be confidential and legally 
protected under applicable law. The message is intended solely for the 
addressee(s). If you are not the intended recipient, you are hereby notified 
that any use, forwarding, dissemination, or reproduction of this message is 
strictly prohibited and may be unlawful. If you are not the intended recipient, 
please contact the sender by return e-mail and destroy all copies of the 
original message.

Reply via email to