Good discussion points. Basically on a high level, we are looking to propose sub-partitioning(locality columns as described above) (how to route inserts within a partition) and an optional ordering(sorting as proposed above) property for inserts.
For sub-partitioning, guess there will be non-trivial amount of work in hudi as we have a max size limit for a given data file. So, let's say the sub-partitioning(/locality column) field is city_id, one datafile may not suffice to hold entire city records for a given partition for large cities. # of data files per city_id should grow dynamically as well. Same applies otherwise too. If there are lot of small cities, may not make sense to have one file per city_id. Guess that's why range partitioning was proposed in the thread above. Anyways, definitely a good addition to hudi for sure. On Fri, Feb 12, 2021 at 11:53 AM Vinoth Chandar <vin...@apache.org> wrote: > Makes a lot of sense to add IMO. > > Satish, since you proposed this thread. what do you suggest as next steps? > Does this deserve a RFC? > > On Wed, Feb 10, 2021 at 5:00 AM Kizhakkel Jose, Felix > <felix.j...@philips.com.invalid> wrote: > > > Hi Vinoth, > > > > Yes that’s exactly what I would like to have. Thank you for seeking > > clarification. > > > > Regards, > > Felix K Jose > > From: Vinoth Chandar <vin...@apache.org> > > Date: Tuesday, February 9, 2021 at 10:44 PM > > To: dev <dev@hudi.apache.org> > > Subject: Re: [DISCUSS] Improve data locality during ingestion > > Caution: This e-mail originated from outside of Philips, be careful for > > phishing. > > > > > > Hi Felix, > > > > Good to hear from you. So, trying to understand your requirement. If we > > sorted all inserts in commit, by some configurable list of fields, while > > sending the updates to their respective file groups, would that suffice? > > > > Then you could periodically cluster if you need to change the sort order > > more globally across the entire partition/table. > > > > Satish, if this is the case, it can be one more "layout optimization" > > strategy which is sort based, as opposed to bucketing/hash modulo > grouping. > > > > This is an awesome discussion folks, let keep going :) > > > > Thanks > > Vinoth > > > > On Tue, Feb 9, 2021 at 5:44 PM Kizhakkel Jose, Felix > > <felix.j...@philips.com.invalid> wrote: > > > > > Hello All, > > > I would like to sort records in each file on COW table by a given key > > > while ingesting/writing data - I am using Spark Data source + Kafka > > > (Structured Streaming). > > > HUDI is doing a great thing of getting each file to the optimal file > > size, > > > (by compaction and appending data to smaller files) so when we get a > file > > > in its optimal file size the records in that file also should be sorted > > if > > > a sort/order-by column is provided. So I don’t have to do another round > > of > > > processing on the optimized file. > > > > > > Regards, > > > Felix K Jose > > > From: Rubens Rodrigues <rubenssoto2...@gmail.com> > > > Date: Tuesday, February 9, 2021 at 8:36 PM > > > To: dev@hudi.apache.org <dev@hudi.apache.org> > > > Subject: Re: [DISCUSS] Improve data locality during ingestion > > > Caution: This e-mail originated from outside of Philips, be careful for > > > phishing. > > > > > > > > > Hi guys, > > > > > > Talking about my use case... > > > > > > I have datasets that ordering data by date makes a lot sense or > ordering > > by > > > some id to have less touched files on merge operations. > > > On my use of delta lake I used to bootstrap tables ever ordering by one > > of > > > these fields and helps a lot on file pruning. > > > > > > Hudi clustering do this job but I think it is an unnecessary extra step > > to > > > do after bulk insert because all data will need to be rewrite again. > > > > > > > > > > > > Em ter, 9 de fev de 2021 21:53, Vinoth Chandar <vin...@apache.org> > > > escreveu: > > > > > > > Hi Satish, > > > > > > > > Been to respond to this. I think I like the idea overall. > > > > > > > > Here's a (hopefully) my understanding version and let me know if I am > > > > getting this right. > > > > > > > > Predominantly, we are just talking about the problem of: where do we > > send > > > > the "inserts" to. > > > > > > > > Today the upsert partitioner does the file sizing/bin-packing etc for > > > > inserts and then sends some inserts over to existing file groups to > > > > maintain file size. > > > > We can abstract all of this into strategies and some kind of pipeline > > > > abstractions and have it also consider "affinity" to an existing file > > > group > > > > based > > > > on say information stored in the metadata table? > > > > > > > > I think this is complimentary to what we do today and can be helpful. > > > First > > > > thing may be is to abstract the existing write pipeline as a series > of > > > > "optimization" > > > > stages and bring things like file sizing under that. > > > > > > > > On bucketing, I am not against Hive bucketing or anything. But with > > > record > > > > level indexes and granular/micro partitions that we can achieve using > > > > clustering, is it still the most efficient design? That's a question > I > > > > would love to find answers for. I never liked the static/hash > > > partitioning > > > > based schemes > > > > in bucketing. they introduce a lot of manual data munging, if things > > > > change. > > > > > > > > Thanks > > > > Vinoth > > > > > > > > > > > > > > > > On Wed, Feb 3, 2021 at 5:17 PM Satish Kotha > > <satishko...@uber.com.invalid > > > > > > > > wrote: > > > > > > > > > I got some feedback that this thread may be a bit complex to > > > understand. > > > > So > > > > > I tried to simplify proposal to below: > > > > > > > > > > Users can already specify 'partitionpath' using this config > > > > > < > > > > > > > > > > > > > > > https://eur01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fhudi.apache.org%2Fdocs%2Fconfigurations.html%23PARTITIONPATH_FIELD_OPT_KEY&data=04%7C01%7C%7Ca5cf89f3f36949c1d70e08d8cd7632c8%7C1a407a2d76754d178692b3ac285306e4%7C0%7C0%7C637485254843211153%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=6jFx4ySr4z52%2B6%2F7rReoO8cu0JsOXaipA8tr8Ewoh9s%3D&reserved=0 > > > > > > > > > > > when > > > > > writing data. My proposal is we also give users the ability to > > identify > > > > (or > > > > > hint at) 'fileId' to while writing the data. For example, users can > > > > > say 'locality.columns: > > > > > session_id'. We deterministically map every session_id to a > specific > > > > > fileGroup in hudi (using hash-modulo or range-partitioning etc). So > > all > > > > > values for a session_id are co-located in the same data/log file. > > > > > > > > > > Hopefully, this explains the idea better. Appreciate any feedback. > > > > > > > > > > On Mon, Feb 1, 2021 at 3:43 PM Satish Kotha <satishko...@uber.com> > > > > wrote: > > > > > > > > > > > Hello, > > > > > > > > > > > > Clustering < > > > > > > https://eur01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fhudi.apache.org%2Fblog%2Fhudi-clustering-intro%2F&data=04%7C01%7C%7Ca5cf89f3f36949c1d70e08d8cd7632c8%7C1a407a2d76754d178692b3ac285306e4%7C0%7C0%7C637485254843211153%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=ecBZbJMMI9g91hKfEyD0EW8TBwdycm8Lqr7LSA90hms%3D&reserved=0 > > > > > > is a > > > > > > great feature for improving data locality. But it has a > (relatively > > > > big) > > > > > > cost to rewrite the data after ingestion. I think there are other > > > ways > > > > to > > > > > > improve data locality during ingestion. For example, we can add a > > new > > > > > Index > > > > > > (or partitioner) that reads values for columns that are important > > > from > > > > a > > > > > > data locality perspective. We could then compute hash modulo on > the > > > > value > > > > > > and use that to deterministically identify the file group that > the > > > > record > > > > > > has to be written into. > > > > > > > > > > > > More detailed example: > > > > > > Assume we introduce 2 new config: > > > > > > hoodie.datasource.write.num.file.groups: "N" #Controls the total > > > number > > > > > of > > > > > > file Ids allowed (per partition). > > > > > > > > > > > > hoodie.datasource.write.locality.columns: "session_id,timestamp" > > > > > #Identify > > > > > > columns that are important for data locality. > > > > > > > > > > > > (I can come up with better names for config if the general idea > > > sounds > > > > > > good). > > > > > > > > > > > > During ingestion, we generate 'N' fileIds for each partition (if > > that > > > > > > partition has already K fileIds, we generate N-K new fileIds). > > Let's > > > > say > > > > > > these fileIds are stored in fileIdList data structure. For each > > row, > > > we > > > > > > compute 'hash(row.get(session_id)+row.get(timestamp)) % N'. This > > > value > > > > > is > > > > > > used as the index into fileIdList data structure to > > deterministically > > > > > > identify the file group for the row. > > > > > > > > > > > > This improves data locality by ensuring columns with a given > value > > > are > > > > > > stored in the same file. This hashing could be done in two > places: > > > > > > 1) A custom index that tags location for each row based on values > > for > > > > > > 'session_id+timestamp'. > > > > > > 2) In a new partitioner that assigns buckets for each row based > of > > > > values > > > > > > for 'session_id+timestamp' > > > > > > > > > > > > *Advantages:* > > > > > > 1) No need to rewrite data for improving data locality. > > > > > > 2) Integrates well with hive bucketing (spark is also adding > > support > > > > for > > > > > > hive bucketing < > > > > > > https://eur01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FSPARK-19256&data=04%7C01%7C%7Ca5cf89f3f36949c1d70e08d8cd7632c8%7C1a407a2d76754d178692b3ac285306e4%7C0%7C0%7C637485254843211153%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=gxGrHuFglkutwHgEre4oETW9qNBvkPxucOQHCJlP0aw%3D&reserved=0 > > > >) > > > > > > 3) This reduces scan cycles to find a particular key because this > > > > ensures > > > > > > that the key is present in a certain fileId. Similarly, joining > > > across > > > > > > multiple tables would be efficient if they both choose the same > > > > > > 'locality.columns'. > > > > > > > > > > > > *Disadvantages:* > > > > > > 1) Users need to know the total number of filegroups to generate > > per > > > > > > partition. This value is assumed to be static for all partitions. > > So > > > if > > > > > > significant changes are expected in traffic volume, this may not > > > > > partition > > > > > > the data well. (We could also consider making this static per > > > > partition, > > > > > > which adds additional complexity, but feasible to do) > > > > > > 2) This may not be as efficient as clustering. For example, data > > for > > > a > > > > > > given column value is guaranteed to be co-located in the same > file. > > > > But > > > > > > they may not be in the same block (row group in parquet). So > more > > > > blocks > > > > > > need to be read by query engines. > > > > > > > > > > > > > > > > > > Clustering can still be useful for other use cases such as > > stitching > > > > > > files, transforming data for efficiency etc. Clustering can also > be > > > > > useful > > > > > > for a few sorting scenarios - e.g., if users cannot predict a > good > > > > value > > > > > > for the number of file groups needed. > > > > > > > > > > > > Appreciate any feedback. Let me know if you have other ideas on > > > > improving > > > > > > data locality. If you are interested in this idea and want to > > > > > collaborate, > > > > > > please reach out. > > > > > > > > > > > > Thanks > > > > > > Satish > > > > > > > > > > > > > > > > > > > > > ________________________________ > > > The information contained in this message may be confidential and > legally > > > protected under applicable law. The message is intended solely for the > > > addressee(s). If you are not the intended recipient, you are hereby > > > notified that any use, forwarding, dissemination, or reproduction of > this > > > message is strictly prohibited and may be unlawful. If you are not the > > > intended recipient, please contact the sender by return e-mail and > > destroy > > > all copies of the original message. > > > > > > > ________________________________ > > The information contained in this message may be confidential and legally > > protected under applicable law. The message is intended solely for the > > addressee(s). If you are not the intended recipient, you are hereby > > notified that any use, forwarding, dissemination, or reproduction of this > > message is strictly prohibited and may be unlawful. If you are not the > > intended recipient, please contact the sender by return e-mail and > destroy > > all copies of the original message. > > > -- Regards, -Sivabalan