Good discussion points.
Basically on a high level, we are looking to propose
sub-partitioning(locality columns as described above) (how to route inserts
within a partition) and an optional ordering(sorting as proposed above)
property for inserts.

For sub-partitioning, guess there will be non-trivial amount of work in
hudi as we have a max size limit for a given data file. So, let's say the
sub-partitioning(/locality column) field is city_id, one datafile may not
suffice to hold entire city records for a given partition for large cities.
# of data files per city_id should grow dynamically as well. Same applies
otherwise too. If there are lot of small cities, may not make sense to have
one file per city_id.

Guess that's why range partitioning was proposed in the thread above.
Anyways, definitely a good addition to hudi for sure.



On Fri, Feb 12, 2021 at 11:53 AM Vinoth Chandar <vin...@apache.org> wrote:

> Makes a lot of sense to add IMO.
>
> Satish, since you proposed this thread. what do you suggest as next steps?
> Does this deserve a RFC?
>
> On Wed, Feb 10, 2021 at 5:00 AM Kizhakkel Jose, Felix
> <felix.j...@philips.com.invalid> wrote:
>
> > Hi Vinoth,
> >
> > Yes that’s exactly what I would like to have. Thank you for seeking
> > clarification.
> >
> > Regards,
> > Felix K Jose
> > From: Vinoth Chandar <vin...@apache.org>
> > Date: Tuesday, February 9, 2021 at 10:44 PM
> > To: dev <dev@hudi.apache.org>
> > Subject: Re: [DISCUSS] Improve data locality during ingestion
> > Caution: This e-mail originated from outside of Philips, be careful for
> > phishing.
> >
> >
> > Hi Felix,
> >
> > Good to hear from you. So, trying to understand your requirement. If we
> > sorted all inserts in commit, by some configurable list of fields, while
> > sending the updates to their respective file groups, would that suffice?
> >
> > Then you could periodically cluster if you need to change the sort order
> > more globally across the entire partition/table.
> >
> > Satish, if this is the case, it can be one more "layout optimization"
> > strategy which is sort based, as opposed to bucketing/hash modulo
> grouping.
> >
> > This is an awesome discussion folks, let keep going :)
> >
> > Thanks
> > Vinoth
> >
> > On Tue, Feb 9, 2021 at 5:44 PM Kizhakkel Jose, Felix
> > <felix.j...@philips.com.invalid> wrote:
> >
> > > Hello All,
> > > I would like to  sort records in each file on COW table by a given key
> > > while ingesting/writing data - I am using Spark Data source + Kafka
> > > (Structured Streaming).
> > > HUDI is doing a great thing of getting each file to the optimal file
> > size,
> > > (by compaction and appending data to smaller files) so when we get a
> file
> > > in its optimal file size the records in that file also should be sorted
> > if
> > > a sort/order-by column is provided. So I don’t have to do another round
> > of
> > > processing on the optimized file.
> > >
> > > Regards,
> > > Felix K Jose
> > > From: Rubens Rodrigues <rubenssoto2...@gmail.com>
> > > Date: Tuesday, February 9, 2021 at 8:36 PM
> > > To: dev@hudi.apache.org <dev@hudi.apache.org>
> > > Subject: Re: [DISCUSS] Improve data locality during ingestion
> > > Caution: This e-mail originated from outside of Philips, be careful for
> > > phishing.
> > >
> > >
> > > Hi guys,
> > >
> > > Talking about my use case...
> > >
> > > I have datasets that ordering data by date makes a lot sense or
> ordering
> > by
> > > some id to have less touched files on merge operations.
> > > On my use of delta lake I used to bootstrap tables ever ordering by one
> > of
> > > these fields and helps a lot on file pruning.
> > >
> > > Hudi clustering do this job but I think it is an unnecessary extra step
> > to
> > > do after bulk insert because all data will need to be rewrite again.
> > >
> > >
> > >
> > > Em ter, 9 de fev de 2021 21:53, Vinoth Chandar <vin...@apache.org>
> > > escreveu:
> > >
> > > > Hi Satish,
> > > >
> > > > Been to respond to this. I think I like the idea overall.
> > > >
> > > > Here's a (hopefully) my understanding version and let me know if I am
> > > > getting this right.
> > > >
> > > > Predominantly, we are just talking about the problem of: where do we
> > send
> > > > the "inserts" to.
> > > >
> > > > Today the upsert partitioner does the file sizing/bin-packing etc for
> > > > inserts and then sends some inserts over to existing file groups to
> > > > maintain file size.
> > > > We can abstract all of this into strategies and some kind of pipeline
> > > > abstractions and have it also consider "affinity" to an existing file
> > > group
> > > > based
> > > > on say information stored in the metadata table?
> > > >
> > > > I think this is complimentary to what we do today and can be helpful.
> > > First
> > > > thing may be is to abstract the existing write pipeline as a series
> of
> > > > "optimization"
> > > > stages and bring things like file sizing under that.
> > > >
> > > > On bucketing, I am not against Hive bucketing or anything. But with
> > > record
> > > > level indexes and granular/micro partitions that we can achieve using
> > > > clustering, is it still the most efficient design? That's a question
> I
> > > > would love to find answers for. I never liked the static/hash
> > > partitioning
> > > > based schemes
> > > > in bucketing. they introduce  a lot of manual data munging, if things
> > > > change.
> > > >
> > > > Thanks
> > > > Vinoth
> > > >
> > > >
> > > >
> > > > On Wed, Feb 3, 2021 at 5:17 PM Satish Kotha
> > <satishko...@uber.com.invalid
> > > >
> > > > wrote:
> > > >
> > > > > I got some feedback that this thread may be a bit complex to
> > > understand.
> > > > So
> > > > > I tried to simplify proposal to below:
> > > > >
> > > > > Users can already specify 'partitionpath' using this config
> > > > > <
> > > > >
> > > >
> > >
> >
> https://eur01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fhudi.apache.org%2Fdocs%2Fconfigurations.html%23PARTITIONPATH_FIELD_OPT_KEY&amp;data=04%7C01%7C%7Ca5cf89f3f36949c1d70e08d8cd7632c8%7C1a407a2d76754d178692b3ac285306e4%7C0%7C0%7C637485254843211153%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=6jFx4ySr4z52%2B6%2F7rReoO8cu0JsOXaipA8tr8Ewoh9s%3D&amp;reserved=0
> > > > > >
> > > > > when
> > > > > writing data. My proposal is we also give users the ability to
> > identify
> > > > (or
> > > > > hint at) 'fileId' to while writing the data. For example, users can
> > > > > say 'locality.columns:
> > > > > session_id'. We deterministically map every session_id to a
> specific
> > > > > fileGroup in hudi (using hash-modulo or range-partitioning etc). So
> > all
> > > > > values for a session_id are co-located in the same data/log file.
> > > > >
> > > > > Hopefully, this explains the idea better. Appreciate any feedback.
> > > > >
> > > > > On Mon, Feb 1, 2021 at 3:43 PM Satish Kotha <satishko...@uber.com>
> > > > wrote:
> > > > >
> > > > > > Hello,
> > > > > >
> > > > > > Clustering <
> > >
> >
> https://eur01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fhudi.apache.org%2Fblog%2Fhudi-clustering-intro%2F&amp;data=04%7C01%7C%7Ca5cf89f3f36949c1d70e08d8cd7632c8%7C1a407a2d76754d178692b3ac285306e4%7C0%7C0%7C637485254843211153%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=ecBZbJMMI9g91hKfEyD0EW8TBwdycm8Lqr7LSA90hms%3D&amp;reserved=0
> > >
> > > is a
> > > > > > great feature for improving data locality. But it has a
> (relatively
> > > > big)
> > > > > > cost to rewrite the data after ingestion. I think there are other
> > > ways
> > > > to
> > > > > > improve data locality during ingestion. For example, we can add a
> > new
> > > > > Index
> > > > > > (or partitioner) that reads values for columns that are important
> > > from
> > > > a
> > > > > > data locality perspective. We could then compute hash modulo on
> the
> > > > value
> > > > > > and use that to deterministically identify the file group that
> the
> > > > record
> > > > > > has to be written into.
> > > > > >
> > > > > > More detailed example:
> > > > > > Assume we introduce 2 new config:
> > > > > > hoodie.datasource.write.num.file.groups: "N" #Controls the total
> > > number
> > > > > of
> > > > > > file Ids allowed (per partition).
> > > > > >
> > > > > > hoodie.datasource.write.locality.columns: "session_id,timestamp"
> > > > > #Identify
> > > > > > columns that are important for data locality.
> > > > > >
> > > > > > (I can come up with better names for config if the general idea
> > > sounds
> > > > > > good).
> > > > > >
> > > > > > During ingestion, we generate 'N' fileIds for each partition (if
> > that
> > > > > > partition has already K fileIds, we generate N-K new fileIds).
> > Let's
> > > > say
> > > > > > these fileIds are stored in fileIdList data structure. For each
> > row,
> > > we
> > > > > > compute 'hash(row.get(session_id)+row.get(timestamp)) % N'.  This
> > > value
> > > > > is
> > > > > > used as the index into fileIdList data structure to
> > deterministically
> > > > > > identify the file group for the row.
> > > > > >
> > > > > > This improves data locality by ensuring columns with a given
> value
> > > are
> > > > > > stored in the same file. This hashing could be done in two
> places:
> > > > > > 1) A custom index that tags location for each row based on values
> > for
> > > > > > 'session_id+timestamp'.
> > > > > > 2) In a new partitioner that assigns buckets for each row based
> of
> > > > values
> > > > > > for 'session_id+timestamp'
> > > > > >
> > > > > > *Advantages:*
> > > > > > 1) No need to rewrite data for improving data locality.
> > > > > > 2) Integrates well with hive bucketing (spark is also adding
> > support
> > > > for
> > > > > > hive bucketing <
> > >
> >
> https://eur01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FSPARK-19256&amp;data=04%7C01%7C%7Ca5cf89f3f36949c1d70e08d8cd7632c8%7C1a407a2d76754d178692b3ac285306e4%7C0%7C0%7C637485254843211153%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&amp;sdata=gxGrHuFglkutwHgEre4oETW9qNBvkPxucOQHCJlP0aw%3D&amp;reserved=0
> > > >)
> > > > > > 3) This reduces scan cycles to find a particular key because this
> > > > ensures
> > > > > > that the key is present in a certain fileId. Similarly, joining
> > > across
> > > > > > multiple tables would be efficient if they both choose the same
> > > > > > 'locality.columns'.
> > > > > >
> > > > > > *Disadvantages:*
> > > > > > 1) Users need to know the total number of filegroups to generate
> > per
> > > > > > partition. This value is assumed to be static for all partitions.
> > So
> > > if
> > > > > > significant changes are expected in traffic volume, this may not
> > > > > partition
> > > > > > the data well.  (We could also consider making this static per
> > > > partition,
> > > > > > which adds additional complexity, but feasible to do)
> > > > > > 2) This may not be as efficient as clustering. For example, data
> > for
> > > a
> > > > > > given column value is guaranteed to be co-located in the same
> file.
> > > > But
> > > > > > they may not be in the same block (row group in parquet).  So
> more
> > > > blocks
> > > > > > need to be read by query engines.
> > > > > >
> > > > > >
> > > > > > Clustering can still be useful for other use cases such as
> > stitching
> > > > > > files, transforming data for efficiency etc. Clustering can also
> be
> > > > > useful
> > > > > > for a few sorting scenarios - e.g., if users cannot predict a
> good
> > > > value
> > > > > > for the number of file groups needed.
> > > > > >
> > > > > > Appreciate any feedback. Let me know if you have other ideas on
> > > > improving
> > > > > > data locality. If you are interested in this idea and want to
> > > > > collaborate,
> > > > > > please reach out.
> > > > > >
> > > > > > Thanks
> > > > > > Satish
> > > > > >
> > > > >
> > > >
> > >
> > > ________________________________
> > > The information contained in this message may be confidential and
> legally
> > > protected under applicable law. The message is intended solely for the
> > > addressee(s). If you are not the intended recipient, you are hereby
> > > notified that any use, forwarding, dissemination, or reproduction of
> this
> > > message is strictly prohibited and may be unlawful. If you are not the
> > > intended recipient, please contact the sender by return e-mail and
> > destroy
> > > all copies of the original message.
> > >
> >
> > ________________________________
> > The information contained in this message may be confidential and legally
> > protected under applicable law. The message is intended solely for the
> > addressee(s). If you are not the intended recipient, you are hereby
> > notified that any use, forwarding, dissemination, or reproduction of this
> > message is strictly prohibited and may be unlawful. If you are not the
> > intended recipient, please contact the sender by return e-mail and
> destroy
> > all copies of the original message.
> >
>


-- 
Regards,
-Sivabalan

Reply via email to