Hi Satish, Been to respond to this. I think I like the idea overall.
Here's a (hopefully) my understanding version and let me know if I am getting this right. Predominantly, we are just talking about the problem of: where do we send the "inserts" to. Today the upsert partitioner does the file sizing/bin-packing etc for inserts and then sends some inserts over to existing file groups to maintain file size. We can abstract all of this into strategies and some kind of pipeline abstractions and have it also consider "affinity" to an existing file group based on say information stored in the metadata table? I think this is complimentary to what we do today and can be helpful. First thing may be is to abstract the existing write pipeline as a series of "optimization" stages and bring things like file sizing under that. On bucketing, I am not against Hive bucketing or anything. But with record level indexes and granular/micro partitions that we can achieve using clustering, is it still the most efficient design? That's a question I would love to find answers for. I never liked the static/hash partitioning based schemes in bucketing. they introduce a lot of manual data munging, if things change. Thanks Vinoth On Wed, Feb 3, 2021 at 5:17 PM Satish Kotha <satishko...@uber.com.invalid> wrote: > I got some feedback that this thread may be a bit complex to understand. So > I tried to simplify proposal to below: > > Users can already specify 'partitionpath' using this config > < > https://hudi.apache.org/docs/configurations.html#PARTITIONPATH_FIELD_OPT_KEY > > > when > writing data. My proposal is we also give users the ability to identify (or > hint at) 'fileId' to while writing the data. For example, users can > say 'locality.columns: > session_id'. We deterministically map every session_id to a specific > fileGroup in hudi (using hash-modulo or range-partitioning etc). So all > values for a session_id are co-located in the same data/log file. > > Hopefully, this explains the idea better. Appreciate any feedback. > > On Mon, Feb 1, 2021 at 3:43 PM Satish Kotha <satishko...@uber.com> wrote: > > > Hello, > > > > Clustering <https://hudi.apache.org/blog/hudi-clustering-intro/> is a > > great feature for improving data locality. But it has a (relatively big) > > cost to rewrite the data after ingestion. I think there are other ways to > > improve data locality during ingestion. For example, we can add a new > Index > > (or partitioner) that reads values for columns that are important from a > > data locality perspective. We could then compute hash modulo on the value > > and use that to deterministically identify the file group that the record > > has to be written into. > > > > More detailed example: > > Assume we introduce 2 new config: > > hoodie.datasource.write.num.file.groups: "N" #Controls the total number > of > > file Ids allowed (per partition). > > > > hoodie.datasource.write.locality.columns: "session_id,timestamp" > #Identify > > columns that are important for data locality. > > > > (I can come up with better names for config if the general idea sounds > > good). > > > > During ingestion, we generate 'N' fileIds for each partition (if that > > partition has already K fileIds, we generate N-K new fileIds). Let's say > > these fileIds are stored in fileIdList data structure. For each row, we > > compute 'hash(row.get(session_id)+row.get(timestamp)) % N'. This value > is > > used as the index into fileIdList data structure to deterministically > > identify the file group for the row. > > > > This improves data locality by ensuring columns with a given value are > > stored in the same file. This hashing could be done in two places: > > 1) A custom index that tags location for each row based on values for > > 'session_id+timestamp'. > > 2) In a new partitioner that assigns buckets for each row based of values > > for 'session_id+timestamp' > > > > *Advantages:* > > 1) No need to rewrite data for improving data locality. > > 2) Integrates well with hive bucketing (spark is also adding support for > > hive bucketing <https://issues.apache.org/jira/browse/SPARK-19256>) > > 3) This reduces scan cycles to find a particular key because this ensures > > that the key is present in a certain fileId. Similarly, joining across > > multiple tables would be efficient if they both choose the same > > 'locality.columns'. > > > > *Disadvantages:* > > 1) Users need to know the total number of filegroups to generate per > > partition. This value is assumed to be static for all partitions. So if > > significant changes are expected in traffic volume, this may not > partition > > the data well. (We could also consider making this static per partition, > > which adds additional complexity, but feasible to do) > > 2) This may not be as efficient as clustering. For example, data for a > > given column value is guaranteed to be co-located in the same file. But > > they may not be in the same block (row group in parquet). So more blocks > > need to be read by query engines. > > > > > > Clustering can still be useful for other use cases such as stitching > > files, transforming data for efficiency etc. Clustering can also be > useful > > for a few sorting scenarios - e.g., if users cannot predict a good value > > for the number of file groups needed. > > > > Appreciate any feedback. Let me know if you have other ideas on improving > > data locality. If you are interested in this idea and want to > collaborate, > > please reach out. > > > > Thanks > > Satish > > >